Skip to content

Commit

Permalink
adapt create_shard to use ReplDev of homestore
Browse files Browse the repository at this point in the history
  • Loading branch information
zichanglai committed Sep 23, 2023
1 parent 427ed24 commit 68e3650
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 355 deletions.
1 change: 1 addition & 0 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ struct PG {
PGInfo pg_info;
uint64_t shard_sequence_num{0};
ShardList shards;
boost::uuids::uuid repl_dev_uuid; // will be filled when pg_creation is based on HS SoloReplDev
};

class HomeObjectImpl : public HomeObject,
Expand Down
40 changes: 28 additions & 12 deletions src/lib/homestore/homeobject.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "homeobject.hpp"
#include "replication_state_machine.hpp"

#include <homestore/homestore.hpp>
#include <homestore/index_service.hpp>
Expand Down Expand Up @@ -38,21 +39,27 @@ void HSHomeObject::init_homestore() {
device_info.emplace_back(std::filesystem::canonical(path).string(), homestore::HSDevType::Data);
}

/// TODO need Repl service eventually and use HeapChunkSelector
_chunk_selector = std::make_shared< HeapChunkSelector >();

using namespace homestore;
bool need_format = HomeStore::instance()->with_data_service(nullptr).with_log_service().start(
hs_input_params{.devices = device_info, .app_mem_size = app_mem_size},
[this]() { register_homestore_metablk_callback(); });
auto hsi = HomeStore::instance();
// when enable replication service of homestore, log service will be enabled automatically.
// and meta service is already enabled by default.
hsi->with_repl_data_service(repl_impl_type::solo, std::make_unique< HOReplServiceCallbacks >(this),
_chunk_selector);

bool need_format = hsi->start(hs_input_params{.devices = device_info, .app_mem_size = app_mem_size},
[this]() { register_homestore_metablk_callback(); });

/// TODO how should this work?
LOGWARN("Persistence Looks Vacant, Formatting!!");
if (need_format) {
HomeStore::instance()->format_and_start(std::map< uint32_t, hs_format_params >{
hsi->format_and_start(std::map< uint32_t, hs_format_params >{
{HS_SERVICE::META, hs_format_params{.size_pct = 5.0}},
{HS_SERVICE::LOG_REPLICATED, hs_format_params{.size_pct = 10.0}},
{HS_SERVICE::LOG_LOCAL, hs_format_params{.size_pct = 5.0}},
{HS_SERVICE::DATA, hs_format_params{.size_pct = 50.0}},
{HS_SERVICE::INDEX, hs_format_params{.size_pct = 30.0}},
{HS_SERVICE::REPLICATION, hs_format_params{.size_pct = 50.0}},
});
}
}
Expand All @@ -65,7 +72,7 @@ void HSHomeObject::register_homestore_metablk_callback() {
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_shard_meta_blk_found(mblk, buf, size);
},
nullptr, true);
[this](bool success) { on_shard_meta_blk_recover_completed(success); }, true);
}

void HomeObjectImpl::init_repl_svc() {
Expand All @@ -86,16 +93,25 @@ HSHomeObject::~HSHomeObject() {
}

void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
std::string shard_info_str;
shard_info_str.append(r_cast< const char* >(buf.bytes()), size);

auto shard = deserialize_shard(shard_info_str);
auto shard = deserialize_shard(r_cast< const char* >(buf.bytes()), size);
shard.metablk_cookie = mblk;

// As shard info in the homestore metablk is always the latest state(OPEN or SEALED),
// we can always create a shard from this shard info and once shard is deleted, the associated metablk will be
// deleted too.
do_commit_new_shard(shard);
}

void HSHomeObject::on_shard_meta_blk_recover_completed(bool success) {
// Find all shard with opening state and excluede their binding chunks from the HeapChunkSelector;
std::unordered_set< homestore::chunk_num_t > excluding_chunks;
std::scoped_lock lock_guard(_pg_lock);
for (auto& pair : _pg_map) {
for (auto& shard : pair.second.shards) {
if (shard.info.state == ShardInfo::State::OPEN) { excluding_chunks.emplace(shard.chunk_id); }
}
}

_chunk_selector->build_per_dev_chunk_heap(excluding_chunks);
}

} // namespace homeobject
29 changes: 18 additions & 11 deletions src/lib/homestore/homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
#include <mutex>

#include "mocks/repl_service.h"

#include "lib/homeobject_impl.hpp"
#include "heap_chunk_selector.h"

#include <homestore/homestore.hpp>
#include <homestore/replication/repl_dev.h>

namespace homestore {
struct meta_blk;
Expand All @@ -14,6 +17,9 @@ struct meta_blk;
namespace homeobject {

class HSHomeObject : public HomeObjectImpl {
std::shared_ptr< HeapChunkSelector > _chunk_selector;

private:
/// Overridable Helpers
ShardManager::Result< ShardInfo > _create_shard(pg_id, uint64_t size_bytes) override;
ShardManager::Result< ShardInfo > _seal_shard(shard_id) override;
Expand All @@ -22,33 +28,34 @@ class HSHomeObject : public HomeObjectImpl {
BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id) const override;
BlobManager::NullResult _del_blob(ShardInfo const&, blob_id) override;
///
mutable std::shared_mutex _flying_shard_lock;
std::map< int64_t, Shard > _flying_shards;

private:
shard_id generate_new_shard_id(pg_id pg);
uint64_t get_sequence_num_from_shard_id(uint64_t shard_id);
std::string serialize_shard(const Shard& shard) const;
Shard deserialize_shard(const std::string& shard_info_str) const;
Shard deserialize_shard(const char* json_str, size_t str_size) const;
void do_commit_new_shard(const Shard& shard);
void do_commit_seal_shard(const Shard& shard);
void register_homestore_metablk_callback();
void* get_shard_metablk(shard_id id);

public:
using HomeObjectImpl::HomeObjectImpl;
~HSHomeObject();

void init_homestore();

static const std::string s_shard_info_sub_type;
void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size);
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
homestore::MultiBlkId const& blkids, void* user_ctx,
homestore::cshared< homestore::ReplDev > repl_dev);

void* get_shard_metablk(shard_id id) const;

bool precheck_and_decode_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, std::string* msg);
ShardManager::Result< uint16_t > get_shard_chunk(shard_id id) const;

void on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* user_ctx);
void on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* user_ctx);
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* user_ctx);
// Recovery part
static const std::string s_shard_info_sub_type;
void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size);
void on_shard_meta_blk_recover_completed(bool success);
};

} // namespace homeobject
9 changes: 6 additions & 3 deletions src/lib/homestore/replication_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,24 @@

namespace homeobject {

ENUM(ReplicationMessageType, uint16_t, PG_MESSAGE = 0, SHARD_MESSAGE, BLOB_MESSAGE, UNKNOWN_MESSAGE);
ENUM(ReplicationMessageType, uint16_t, CREATE_SHARD_MSG, SEAL_SHARD_MSG, PUT_BLOB_MSG, DEL_BLOB_MSG, UNKNOWN_MESSAGE);

// magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum'
static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34;
static constexpr uint32_t HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1 = 0x01;
static constexpr uint32_t init_crc32 = 0;

using replication_group_id = pg_id;

#pragma pack(1)
struct ReplicationMessageHeader {
uint64_t magic_num{HOMEOBJECT_REPLICATION_MAGIC};
uint32_t protocol_version{HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1};
ReplicationMessageType message_type;
replication_group_id repl_group_id; // replication group upon which the msg is being replicated;
ReplicationMessageType msg_type; // message type
uint32_t payload_size;
uint32_t payload_crc;
uint8_t reserved_pad[6]{};
uint8_t reserved_pad[4]{};
uint32_t header_crc;
uint32_t calculate_crc() const {
return crc32_ieee(init_crc32, r_cast< const unsigned char* >(this), sizeof(*this) - sizeof(header_crc));
Expand Down
72 changes: 46 additions & 26 deletions src/lib/homestore/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@

namespace homeobject {

std::unique_ptr< homestore::ReplDevListener >
HOReplServiceCallbacks::on_repl_dev_init(homestore::cshared< homestore::ReplDev >& repl_dev) {
return std::make_unique< ReplicationStateMachine >(_home_object, repl_dev);
}

void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
const home_replication::pba_list_t& pbas, void* ctx) {
homestore::MultiBlkId const& blkids, void* ctx) {
LOGINFO("applying raft log commit with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.bytes);

switch (msg_header->message_type) {
case ReplicationMessageType::SHARD_MESSAGE: {
_home_object->on_shard_message_commit(lsn, header, key, ctx);
switch (msg_header->msg_type) {
case ReplicationMessageType::CREATE_SHARD_MSG:
case ReplicationMessageType::SEAL_SHARD_MSG: {
_home_object->on_shard_message_commit(lsn, header, key, blkids, ctx, _repl_dev);
break;
}
case ReplicationMessageType::PG_MESSAGE:
case ReplicationMessageType::BLOB_MESSAGE:
case ReplicationMessageType::PUT_BLOB_MSG:
case ReplicationMessageType::DEL_BLOB_MSG:
default: {
break;
}
Expand All @@ -23,36 +28,51 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c

void ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {
LOGINFO("on_pre_commit with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.bytes);

switch (msg_header->message_type) {
case ReplicationMessageType::SHARD_MESSAGE: {
_home_object->on_pre_commit_shard_msg(lsn, header, key, ctx);
break;
}
case ReplicationMessageType::PG_MESSAGE:
case ReplicationMessageType::BLOB_MESSAGE:
default: {
break;
}
}
// For shard creation, since homestore repldev inside will write shard header to data service first before this
// function is called. So there is nothing is needed to do and we can get the binding chunk_id with the newly shard
// from the blkid in on_commit()
}

void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {
LOGINFO("rollback with lsn:{}", lsn);
LOGINFO("on_rollback with lsn:{}", lsn);
}

homestore::blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, void* user_ctx) {
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.bytes);
if (msg_header->header_crc != msg_header->calculate_crc()) {
LOGWARN("replication message header is corrupted with crc error and can not get blk alloc hints");
return homestore::blk_alloc_hints();
}

switch (msg_header->msg_type) {
case ReplicationMessageType::CREATE_SHARD_MSG: {
auto list_shard_result = _home_object->shard_manager()->list_shards(msg_header->repl_group_id).get();
if (!list_shard_result) {
LOGWARN("list shards failed with unknown pg {}", msg_header->repl_group_id);
break;
}

switch (msg_header->message_type) {
case ReplicationMessageType::SHARD_MESSAGE: {
_home_object->on_rollback_shard_msg(lsn, header, key, ctx);
if (list_shard_result.value().empty()) {
// pg is empty without any shards, we leave the decision the HeapChunkSelector to select a pdev
// with most available space and select one chunk based on that pdev
} else {
auto chunk_id = _home_object->get_shard_chunk(list_shard_result.value().front().id);
RELEASE_ASSERT(!!chunk_id, "unknown shard id to get binded chunk");
// TODO:HS will add a new interface to get alloc hint based on a reference chunk;
// and we can will call that interface for return alloc hint;
}
break;
}
case ReplicationMessageType::PG_MESSAGE:
case ReplicationMessageType::BLOB_MESSAGE:

case ReplicationMessageType::SEAL_SHARD_MSG:
case ReplicationMessageType::PUT_BLOB_MSG:
case ReplicationMessageType::DEL_BLOB_MSG:
default: {
break;
}
}

return homestore::blk_alloc_hints();
}

void ReplicationStateMachine::on_replica_stop() {}
Expand Down
59 changes: 42 additions & 17 deletions src/lib/homestore/replication_state_machine.hpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
#pragma once

#include "homeobject.hpp"
#include "mocks/repl_service.h"

#include <homestore/replication_service.hpp>
#include <homestore/replication/repl_dev.h>

namespace homeobject {

class HomeObjectImpl;

class ReplicationStateMachine : public home_replication::ReplicaSetListener {
class HOReplServiceCallbacks : public homestore::ReplServiceCallbacks {
HSHomeObject* _home_object{nullptr};

public:
explicit HOReplServiceCallbacks(HSHomeObject* home_object) : _home_object(home_object) {}
virtual ~HOReplServiceCallbacks() = default;
virtual std::unique_ptr< homestore::ReplDevListener >
on_repl_dev_init(homestore::cshared< homestore::ReplDev >& rs);
};

class ReplicationStateMachine : public homestore::ReplDevListener {
public:
explicit ReplicationStateMachine(HSHomeObject* home_object) : _home_object(home_object) {}
explicit ReplicationStateMachine(HSHomeObject* home_object, homestore::cshared< homestore::ReplDev >& repl_dev) :
_home_object(home_object), _repl_dev(repl_dev) {}

virtual ~ReplicationStateMachine() = default;

~ReplicationStateMachine() = default;
/// @brief Called when the log entry has been committed in the replica set.
///
/// This function is called from a dedicated commit thread which is different from the original thread calling
Expand All @@ -20,13 +34,13 @@ class ReplicationStateMachine : public home_replication::ReplicaSetListener {
/// @param lsn - The log sequence number
/// @param header - Header originally passed with replica_set::write() api
/// @param key - Key originally passed with replica_set::write() api
/// @param pbas - List of pbas where data is written to the storage engine.
/// @param blkids - List of blkids where data is written to the storage engine.
/// @param ctx - User contenxt passed as part of the replica_set::write() api
///
virtual void on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
const home_replication::pba_list_t& pbas, void* ctx);
virtual void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
homestore::MultiBlkId const& blkids, void* ctx) override;

/// @brief Called when the log entry has been received by the replica set.
/// @brief Called when the log entry has been received by the replica dev.
///
/// On recovery, this is called from a random worker thread before the raft server is started. It is
/// guaranteed to be serialized in log index order.
Expand All @@ -41,10 +55,10 @@ class ReplicationStateMachine : public home_replication::ReplicaSetListener {
/// currently in pre-commit, but yet to be committed.
///
/// @param lsn - The log sequence number
/// @param header - Header originally passed with replica_set::write() api
/// @param key - Key originally passed with replica_set::write() api
/// @param ctx - User contenxt passed as part of the replica_set::write() api
virtual void on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx);
/// @param header - Header originally passed with repl_dev::write() api
/// @param key - Key originally passed with repl_dev::write() api
/// @param ctx - User contenxt passed as part of the repl_dev::write() api
virtual void on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx) override;

/// @brief Called when the log entry has been rolled back by the replica set.
///
Expand All @@ -56,16 +70,27 @@ class ReplicationStateMachine : public home_replication::ReplicaSetListener {
/// NOTE: Listener should do the free any resources created as part of pre-commit.
///
/// @param lsn - The log sequence number getting rolled back
/// @param header - Header originally passed with replica_set::write() api
/// @param key - Key originally passed with replica_set::write() api
/// @param ctx - User contenxt passed as part of the replica_set::write() api
virtual void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx);
/// @param header - Header originally passed with repl_dev::write() api
/// @param key - Key originally passed with repl_dev::write() api
/// @param ctx - User contenxt passed as part of the repl_dev::write() api
virtual void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx) override;

/// @brief Called when replication module is trying to allocate a block to write the value
///
/// This function can be called both on leader and follower when it is trying to allocate a block to write the
/// value. Caller is expected to provide hints for allocation based on the header supplied as part of original
/// write. In cases where caller don't care about the hints can return default blk_alloc_hints.
///
/// @param header Header originally passed with repl_dev::write() api on the leader
/// @return Expected to return blk_alloc_hints for this write
virtual homestore::blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header, void* user_ctx) override;

/// @brief Called when the replica set is being stopped
virtual void on_replica_stop();
virtual void on_replica_stop() override;

private:
HSHomeObject* _home_object;
homestore::cshared< homestore::ReplDev > _repl_dev;
};

} // namespace homeobject
Loading

0 comments on commit 68e3650

Please sign in to comment.