Skip to content

Commit

Permalink
add metablk recovery support for create_shard
Browse files Browse the repository at this point in the history
  • Loading branch information
zichanglai committed Sep 5, 2023
1 parent d933d65 commit 9d064e8
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 68 deletions.
2 changes: 2 additions & 0 deletions src/include/homeobject/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ using peer_id = boost::uuids::uuid;
using pg_id = uint16_t;
using shard_id = uint64_t;

static constexpr uint32_t init_crc32 = 0;

template < class E >
class Manager {
public:
Expand Down
1 change: 1 addition & 0 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class HomeObjectImpl : public HomeObject,
ShardManager::AsyncResult< ShardInfo > create_shard(pg_id pg_owner, uint64_t size_bytes) final;
ShardManager::AsyncResult< InfoList > list_shards(pg_id pg) const final;
ShardManager::AsyncResult< ShardInfo > seal_shard(shard_id id) final;
uint64_t get_current_timestamp();

static const std::string s_shard_info_sub_type;
/// BlobManager
Expand Down
25 changes: 5 additions & 20 deletions src/lib/homestore/homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,15 @@ HSHomeObject::HSHomeObject(std::weak_ptr< HomeObjectApplication >&& application)
}

void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
const char* shard_info_json_str = r_cast<const char*>(buf.bytes());
auto shard_info_json = nlohmann::json::parse(shard_info_json_str, shard_info_json_str + size);
std::string shard_info_str;
shard_info_str.append(r_cast<const char*>(buf.bytes()), size);

CShardInfo shard_info = std::make_shared<ShardInfo>();
shard_info->id = shard_info_json["shard_id"].get<shard_id>();
shard_info->placement_group = shard_info_json["pg_id"].get<pg_id>();
shard_info->state = static_cast<ShardInfo::State>(shard_info_json["state"].get<int>());
shard_info->total_capacity_bytes = shard_info_json["capacity"].get<uint64_t>();

std::scoped_lock lock_guard(_pg_lock, _shard_lock);
CShardInfo shard_info = deserialize_shard_info(shard_info_str);
if (shard_info->state == ShardInfo::State::OPEN) {
// create shard;
auto pg_iter = _pg_map.find(shard_info->placement_group);
RELEASE_ASSERT(pg_iter != _pg_map.end(), "Missing PG info");
pg_iter->second.shards.push_back(shard_info);
auto sequence_num = get_sequence_num_from_shard_id(shard_info->id);
if (sequence_num > pg_iter->second.next_sequence_num_for_new_shard) {
pg_iter->second.next_sequence_num_for_new_shard = sequence_num;
}
_shard_map[shard_info->id] = shard_info;
do_commit_new_shard(shard_info);
} else {
auto shard_iter = _shard_map.find(shard_info->id);
RELEASE_ASSERT(shard_iter != _shard_map.end(), "Missing shard info");
shard_iter->second = shard_info;
do_commit_seal_shard(shard_info);
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/lib/homestore/homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@ class HSHomeObject : public HomeObjectImpl {
shard_id generate_new_shard_id(pg_id pg);
shard_id make_new_shard_id(pg_id pg, uint64_t sequence_num) const;
uint64_t get_sequence_num_from_shard_id(uint64_t shard_id);
std::string prepare_create_shard_message(pg_id pg, shard_id new_shard_id, uint64_t shard_size) const;
std::string serialize_shard_info(CShardInfo shard_info) const;
CShardInfo deserialize_shard_info(const std::string& shard_info_str) const;
void do_commit_new_shard(CShardInfo shard_info);
void do_commit_seal_shard(CShardInfo shard_info);
public:
using HomeObjectImpl::HomeObjectImpl;
HSHomeObject(std::weak_ptr< HomeObjectApplication >&& application);
~HSHomeObject() override = default;
void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size);
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
sisl::sg_list const& value, void* user_ctx);
void* user_ctx);
};

} // namespace homeobject
12 changes: 8 additions & 4 deletions src/lib/homestore/replication_message.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "homeobject/common.hpp"

#include <sisl/utility/enum.hpp>
#include <isa-l/crc.h>

Expand All @@ -9,16 +11,18 @@ ENUM(ReplicationMessageType, uint16_t, PG_MESSAGE = 0, SHARD_MESSAGE, BLOB_MESSA

// magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum'
static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34;
static constexpr uint32_t HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1 = 0x01;

struct ReplicationMessageHeader {
uint64_t magic_num{HOMEOBJECT_REPLICATION_MAGIC};
uint32_t protocol_version{HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1};
ReplicationMessageType message_type;
uint32_t message_size;
uint32_t message_crc;
uint8_t reserved_pad[2]{};
uint32_t payload_size;
uint32_t payload_crc;
uint8_t reserved_pad[6]{};
uint32_t header_crc;
uint32_t calculate_crc() const {
return crc32_ieee(0, r_cast<const unsigned char*>(this), sizeof(*this) - sizeof(header_crc));
return crc32_ieee(init_crc32, r_cast<const unsigned char*>(this), sizeof(*this) - sizeof(header_crc));
}
};

Expand Down
5 changes: 2 additions & 3 deletions src/lib/homestore/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ namespace homeobject {

void ReplicationStateMachine::on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
blkid_list_t const& blkids,void* ctx) {
LOGINFO("applying raft log commit with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast<const ReplicationMessageHeader*>(header.bytes);
//TODO: read data from pba or get data from replication context(prefered)
sisl::sg_list value;

switch (msg_header->message_type) {
case SHARD_MESSAGE: {
home_object->on_shard_message_commit(lsn, header, key, value, ctx);
home_object->on_shard_message_commit(lsn, header, key,ctx);
}
case PG_MESSAGE:
case BLOB_MESSAGE:
Expand Down
106 changes: 74 additions & 32 deletions src/lib/homestore/shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,34 @@ shard_id HSHomeObject::make_new_shard_id(pg_id pg, uint64_t sequence_num) const
uint64_t HSHomeObject::get_sequence_num_from_shard_id(uint64_t shard_id) {
return shard_id & (0x0000FFFFFFFFFFFF);
}
std::string HSHomeObject::prepare_create_shard_message(pg_id pg, shard_id new_shard_id, uint64_t shard_size) const {

std::string HSHomeObject::serialize_shard_info(CShardInfo shard_info) const {
nlohmann::json j;
j["pg_id"] = pg;
j["shard_id"] = new_shard_id;
j["state"] = ShardInfo::State::OPEN;
j["capacity"] = shard_size;
j["shard_id"] = shard_info->id;
j["pg_id"] = shard_info->placement_group;
j["state"] = shard_info->state;
j["created_time"] = shard_info->created_time;
j["modified_time"] = shard_info->last_modified_time;
j["total_capacity"] = shard_info->total_capacity_bytes;
j["available_capacity"] = shard_info->available_capacity_bytes;
j["deleted_capacity"] = shard_info->deleted_capacity_bytes;
return j.dump();
}

CShardInfo HSHomeObject:: deserialize_shard_info(const std::string& shard_info_str) const {
auto shard_info_json = nlohmann::json::parse(shard_info_str);
CShardInfo shard_info = std::make_shared<ShardInfo>();
shard_info->id = shard_info_json["shard_id"].get<shard_id>();
shard_info->placement_group = shard_info_json["pg_id"].get<pg_id>();
shard_info->state = static_cast<ShardInfo::State>(shard_info_json["state"].get<int>());
shard_info->created_time = shard_info_json["created_time"].get<uint64_t>();
shard_info->last_modified_time = shard_info_json["modified_time"].get<uint64_t>();
shard_info->available_capacity_bytes = shard_info_json["available_capacity"].get<uint64_t>();
shard_info->total_capacity_bytes = shard_info_json["total_capacity"].get<uint64_t>();
shard_info->deleted_capacity_bytes = shard_info_json["deleted_capacity"].get<uint64_t>();
return shard_info;
}

ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id pg_owner, uint64_t size_bytes) {
bool pg_exist = check_if_pg_exist(pg_owner);
if (!pg_exist) {
Expand All @@ -62,26 +81,34 @@ ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id pg_owner, ui
}

auto new_shard_id = generate_new_shard_id(pg_owner);
std::string create_shard_message = prepare_create_shard_message(pg_owner, new_shard_id, size_bytes);
ReplicationMessageHeader header;
header.message_type = ReplicationMessageType::SHARD_MESSAGE;
header.message_size = create_shard_message.size();
header.message_crc = crc32_ieee(0, r_cast< const uint8_t* >(create_shard_message.c_str()), create_shard_message.size());
header.header_crc = header.calculate_crc();
auto create_time = get_current_timestamp();
auto shard_info = std::make_shared<ShardInfo>(ShardInfo(new_shard_id, pg_owner, ShardInfo::State::OPEN,
create_time, create_time, size_bytes, size_bytes, 0));
std::string create_shard_message = serialize_shard_info(shard_info);
//preapre msg header;
const uint32_t needed_size = sizeof(ReplicationMessageHeader) + create_shard_message.size();
auto buf = nuraft::buffer::alloc(needed_size);
uint8_t* raw_ptr = static_cast<uint8_t*>(buf->data_begin());
ReplicationMessageHeader *header = new(raw_ptr) ReplicationMessageHeader();
header->message_type = ReplicationMessageType::SHARD_MESSAGE;
header->payload_size = create_shard_message.size();
header->payload_crc = crc32_ieee(init_crc32, r_cast< const uint8_t* >(create_shard_message.c_str()), create_shard_message.size());
header->header_crc = header->calculate_crc();
raw_ptr += sizeof(ReplicationMessageHeader);
std::memcpy(raw_ptr, create_shard_message.c_str(), create_shard_message.size());

sisl::sg_list value;
value.size = create_shard_message.size();
value.iovs.emplace_back(iovec{static_cast<void*>(const_cast<char*>(create_shard_message.c_str())),
create_shard_message.size()});
//replicate this create shard message to PG members;
auto [p, sf] = folly::makePromiseContract< ShardManager::Result<ShardInfo> >();
repl_dev->async_alloc_write(sisl::blob(uintptr_cast(&header), sizeof(header)),
repl_dev->async_alloc_write(sisl::blob(buf->data_begin(), needed_size),
sisl::blob(),
value,
static_cast<void*>(&p));
auto info = std::move(sf).via(folly::getGlobalCPUExecutor()).get();
if (!bool(info)) {
LOGWARN("create new shard [{}] on pg [{}] is failed with error:{}", new_shard_id, pg_owner, info.error());
}
header->~ReplicationMessageHeader();
return info;
}

Expand All @@ -98,7 +125,7 @@ ShardManager::Result< InfoList > HSHomeObject::_list_shards(pg_id pg) const {
}

void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
sisl::sg_list const& value, void* user_ctx) {
void* user_ctx) {
const ReplicationMessageHeader* msg_header = r_cast<const ReplicationMessageHeader*>(header.bytes);
folly::Promise<ShardManager::Result<ShardInfo> > *promise = nullptr;
// user_ctx will be nullptr when:
Expand All @@ -117,39 +144,54 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header
}

std::string string_value;
for (auto & iovec: value.iovs) {
string_value.append(static_cast<char*>(iovec.iov_base), iovec.iov_len);
}
string_value.append(r_cast<char*>(header.bytes + sizeof(ReplicationMessageHeader)), msg_header->payload_size);

auto crc = crc32_ieee(0, r_cast< const uint8_t* >(string_value.c_str()), string_value.size());
if (msg_header->message_crc != crc) {
auto crc = crc32_ieee(init_crc32, r_cast< const uint8_t* >(string_value.c_str()), string_value.size());
if (msg_header->payload_crc != crc) {
LOGWARN("replication message body is corrupted with crc error, lsn:{}", lsn);
if (promise) {
promise->setValue(folly::makeUnexpected(ShardError::INVALID_ARG));
}
return;
}

//preapre ShardInfoExt msg and persisted this ShardInfoExt to homestore MetaBlkService;
//TODO: preapre ShardInfoExt msg with chunk_id and persisted the ShardInfoExt to homestore MetaBlkService;
void* cookie = nullptr;
homestore::hs()->meta_service().add_sub_sb(HomeObjectImpl::s_shard_info_sub_type, r_cast<const uint8_t*>(string_value.c_str()), string_value.size(), cookie);

//update in-memory shard map;
CShardInfo shard_info = std::make_shared<ShardInfo>();
auto shard_info_json = nlohmann::json::parse(string_value);
shard_info->id = shard_info_json["shard_id"].get<shard_id>();
shard_info->placement_group = shard_info_json["pg_id"].get<pg_id>();
shard_info->state = static_cast<ShardInfo::State>(shard_info_json["state"].get<int>());
shard_info->total_capacity_bytes = shard_info_json["capacity"].get<uint64_t>();
std::scoped_lock lock_guard(_pg_lock, _shard_lock);
CShardInfo shard_info = deserialize_shard_info(string_value);
if (shard_info->state == ShardInfo::State::OPEN) {
//create shard;
do_commit_new_shard(shard_info);
} else {
do_commit_seal_shard(shard_info);
}

if (promise) {
promise->setValue(ShardManager::Result<ShardInfo>(*shard_info));
}
}

void HSHomeObject::do_commit_new_shard(CShardInfo shard_info) {
std::scoped_lock lock_guard(_pg_lock, _shard_lock);
auto pg_iter = _pg_map.find(shard_info->placement_group);
RELEASE_ASSERT(pg_iter != _pg_map.end(), "Missing PG info");
pg_iter->second.shards.push_back(shard_info);
_shard_map[shard_info->id] = shard_info;

if (promise) {
promise->setValue(ShardManager::Result<ShardInfo>(*shard_info));
//following part is must for follower members or when member is restarted;
auto sequence_num = get_sequence_num_from_shard_id(shard_info->id);
if (sequence_num > pg_iter->second.next_sequence_num_for_new_shard) {
pg_iter->second.next_sequence_num_for_new_shard = sequence_num;
}
_shard_map[shard_info->id] = shard_info;
}

void HSHomeObject::do_commit_seal_shard(CShardInfo shard_info) {
std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(shard_info->id);
RELEASE_ASSERT(shard_iter != _shard_map.end(), "Missing shard info");
shard_iter->second = shard_info;
}

} // namespace homeobject
7 changes: 0 additions & 7 deletions src/lib/memory/shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,6 @@ namespace homeobject {

uint64_t ShardManager::max_shard_size() { return Gi; }

static uint64_t get_current_timestamp() {
auto now = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast< std::chrono::milliseconds >(now.time_since_epoch());
auto timestamp = static_cast< uint64_t >(duration.count());
return timestamp;
}

ShardManager::Result< ShardInfo > MemoryHomeObject::_create_shard(pg_id pg_owner, uint64_t size_bytes) {
auto lg = std::scoped_lock(_pg_lock, _shard_lock);
auto pg_it = _pg_map.find(pg_owner);
Expand Down
7 changes: 7 additions & 0 deletions src/lib/shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,11 @@ ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::seal_shard(shard_id id) {

ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::get_shard(shard_id id) const { return _get_shard(id); }

uint64_t HomeObjectImpl::get_current_timestamp() {
auto now = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast< std::chrono::milliseconds >(now.time_since_epoch());
auto timestamp = static_cast< uint64_t >(duration.count());
return timestamp;
}

} // namespace homeobject

0 comments on commit 9d064e8

Please sign in to comment.