Skip to content

Commit

Permalink
add metablk recovery support for create_shard
Browse files Browse the repository at this point in the history
  • Loading branch information
zichanglai committed Sep 11, 2023
1 parent 8d103f8 commit 0af7f2e
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 150 deletions.
8 changes: 0 additions & 8 deletions src/include/homeobject/pg_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <sisl/utility/enum.hpp>

#include "common.hpp"
#include "shard_manager.hpp"

namespace homeobject {

Expand All @@ -28,13 +27,6 @@ struct PGInfo {
mutable MemberSet members;
};

struct PG {
explicit PG(PGInfo info) : pg_info(std::move(info)) {}
PGInfo pg_info;
uint64_t next_sequence_num_for_new_shard{0};
CShardInfoList shards;
};

class PGManager : public Manager< PGError > {
public:
virtual NullAsyncResult create_pg(PGInfo&& pg_info) = 0;
Expand Down
4 changes: 1 addition & 3 deletions src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ struct ShardInfo {
std::optional< peer_id > current_leader{std::nullopt};
};

using InfoList = std::list<ShardInfo>;
using CShardInfo = std::shared_ptr<ShardInfo>;
using CShardInfoList = std::list< CShardInfo >;
using InfoList = std::list< ShardInfo >;

class ShardManager : public Manager< ShardError > {
public:
Expand Down
29 changes: 22 additions & 7 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,6 @@

#include <sisl/logging/logging.h>


template <>
struct std::hash< homeobject::ShardInfo > {
std::size_t operator()(homeobject::ShardInfo const& i) const noexcept { return std::hash< uint64_t >()(i.id); }
};

namespace home_replication {
class ReplicationService;
}
Expand All @@ -22,6 +16,26 @@ namespace homeobject {
inline bool operator<(ShardInfo const& lhs, ShardInfo const& rhs) { return lhs.id < rhs.id; }
inline bool operator==(ShardInfo const& lhs, ShardInfo const& rhs) { return lhs.id == rhs.id; }

struct ShardInfoExt {
uint16_t chunk_id;
};

struct Shard {
explicit Shard(ShardInfo info) : info(std::move(info)) {}
ShardInfo info;
ShardInfoExt ext_info;
};

using ShardPtr = std::shared_ptr< Shard >;
using ShardPtrList = std::list< ShardPtr >;

struct PG {
explicit PG(PGInfo info) : pg_info(std::move(info)) {}
PGInfo pg_info;
uint64_t shard_sequence_num{0};
ShardPtrList shards;
};

class HomeObjectImpl : public HomeObject,
public BlobManager,
public PGManager,
Expand Down Expand Up @@ -54,7 +68,7 @@ class HomeObjectImpl : public HomeObject,
std::map< pg_id, PG > _pg_map;

mutable std::shared_mutex _shard_lock;
std::map< shard_id, CShardInfo > _shard_map;
std::map < shard_id, ShardPtr > _shard_map;
///
public:
explicit HomeObjectImpl(std::weak_ptr< HomeObjectApplication >&& application) :
Expand Down Expand Up @@ -84,6 +98,7 @@ class HomeObjectImpl : public HomeObject,
ShardManager::AsyncResult< ShardInfo > create_shard(pg_id pg_owner, uint64_t size_bytes) final;
ShardManager::AsyncResult< InfoList > list_shards(pg_id pg) const final;
ShardManager::AsyncResult< ShardInfo > seal_shard(shard_id id) final;
uint64_t get_current_timestamp();

static const std::string s_shard_info_sub_type;
/// BlobManager
Expand Down
1 change: 1 addition & 0 deletions src/lib/homestore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE
homeobject.cpp
blob_manager.cpp
shard_manager.cpp
replication_state_machine.cpp
$<TARGET_OBJECTS:${PROJECT_NAME}_core>
)
target_link_libraries("${PROJECT_NAME}_homestore"
Expand Down
34 changes: 9 additions & 25 deletions src/lib/homestore/homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace homeobject {

const std::string HomeObjectImpl::s_shard_info_sub_type = "shard_info";
const std::string HSHomeObject::s_shard_info_sub_type = "shard_info";

extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application) {
LOGINFOMOD(homeobject, "Initializing HomeObject");
Expand All @@ -30,7 +30,8 @@ void HSHomeObject::init_homestore() {
ioenvironment.with_iomgr(iomgr::iomgr_params{.num_threads = app->threads(), .is_spdk = app->spdk_mode()});

//register some callbacks for metadata recovery;
HomeStore::instance()->meta_service().register_handler(HomeObjectImpl::s_shard_info_sub_type,
using namespace homestore;
HomeStore::instance()->meta_service().register_handler(HSHomeObject::s_shard_info_sub_type,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_shard_meta_blk_found(mblk, buf, size);
},
Expand All @@ -47,7 +48,6 @@ void HSHomeObject::init_homestore() {
}

/// TODO need Repl service eventually yeah?
using namespace homestore;
uint32_t services = HS_SERVICE::META | HS_SERVICE::LOG_REPLICATED | HS_SERVICE::LOG_LOCAL | HS_SERVICE::DATA;

bool need_format = HomeStore::instance()->start(
Expand Down Expand Up @@ -77,38 +77,22 @@ void HomeObjectImpl::init_repl_svc() {
}
}


HSHomeObject::~HSHomeObject() {
homestore::HomeStore::instance()->shutdown();
homestore::HomeStore::reset_instance();
iomanager.stop();
}

void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
const char* shard_info_json_str = r_cast<const char*>(buf.bytes());
auto shard_info_json = nlohmann::json::parse(shard_info_json_str, shard_info_json_str + size);

CShardInfo shard_info = std::make_shared<ShardInfo>();
shard_info->id = shard_info_json["shard_id"].get<shard_id>();
shard_info->placement_group = shard_info_json["pg_id"].get<pg_id>();
shard_info->state = static_cast<ShardInfo::State>(shard_info_json["state"].get<int>());
shard_info->total_capacity_bytes = shard_info_json["capacity"].get<uint64_t>();
std::string shard_info_str;
shard_info_str.append(r_cast<const char*>(buf.bytes()), size);

std::scoped_lock lock_guard(_pg_lock, _shard_lock);
if (shard_info->state == ShardInfo::State::OPEN) {
auto shard = deserialize_shard_info(shard_info_str);
if (shard->info.state == ShardInfo::State::OPEN) {
// create shard;
auto pg_iter = _pg_map.find(shard_info->placement_group);
RELEASE_ASSERT(pg_iter != _pg_map.end(), "Missing PG info");
pg_iter->second.shards.push_back(shard_info);
auto sequence_num = get_sequence_num_from_shard_id(shard_info->id);
if (sequence_num > pg_iter->second.next_sequence_num_for_new_shard) {
pg_iter->second.next_sequence_num_for_new_shard = sequence_num;
}
_shard_map[shard_info->id] = shard_info;
do_commit_new_shard(shard);
} else {
auto shard_iter = _shard_map.find(shard_info->id);
RELEASE_ASSERT(shard_iter != _shard_map.end(), "Missing shard info");
shard_iter->second = shard_info;
do_commit_seal_shard(shard);
}
}

Expand Down
13 changes: 11 additions & 2 deletions src/lib/homestore/homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,25 @@ class HSHomeObject : public HomeObjectImpl {
shard_id generate_new_shard_id(pg_id pg);
shard_id make_new_shard_id(pg_id pg, uint64_t sequence_num) const;
uint64_t get_sequence_num_from_shard_id(uint64_t shard_id);
std::string prepare_create_shard_message(pg_id pg, shard_id new_shard_id, uint64_t shard_size) const;
std::string serialize_shard_info(ShardPtr shard) const;
ShardPtr deserialize_shard_info(const std::string& shard_info_str) const;
void do_commit_new_shard(ShardPtr shard_info);
void do_commit_seal_shard(ShardPtr shard_info);
public:
using HomeObjectImpl::HomeObjectImpl;
~HSHomeObject();

void init_homestore();

static const std::string s_shard_info_sub_type;
void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size);

void on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
void* user_ctx);
void on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
void* user_ctx);
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
sisl::sg_list const& value, void* user_ctx);
void* user_ctx);
};

} // namespace homeobject
13 changes: 9 additions & 4 deletions src/lib/homestore/replication_message.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "homeobject/common.hpp"

#include <sisl/utility/enum.hpp>
#include <isa-l/crc.h>

Expand All @@ -9,16 +11,19 @@ ENUM(ReplicationMessageType, uint16_t, PG_MESSAGE = 0, SHARD_MESSAGE, BLOB_MESSA

// magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum'
static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34;
static constexpr uint32_t HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1 = 0x01;
static constexpr uint32_t init_crc32 = 0;

struct ReplicationMessageHeader {
uint64_t magic_num{HOMEOBJECT_REPLICATION_MAGIC};
uint32_t protocol_version{HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1};
ReplicationMessageType message_type;
uint32_t message_size;
uint32_t message_crc;
uint8_t reserved_pad[2]{};
uint32_t payload_size;
uint32_t payload_crc;
uint8_t reserved_pad[6]{};
uint32_t header_crc;
uint32_t calculate_crc() const {
return crc32_ieee(0, r_cast<const unsigned char*>(this), sizeof(*this) - sizeof(header_crc));
return crc32_ieee(init_crc32, r_cast<const unsigned char*>(this), sizeof(*this) - sizeof(header_crc));
}
};

Expand Down
52 changes: 39 additions & 13 deletions src/lib/homestore/replication_state_machine.cpp
Original file line number Diff line number Diff line change
@@ -1,32 +1,58 @@
#include "replication_message.hpp"
#include "replication_state_machine.hpp"
#include "homeobject_impl.hpp"

namespace homeobject {

void ReplicationStateMachine::on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
blkid_list_t const& blkids,void* ctx) {
void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
const home_replication::pba_list_t& pbas, void* ctx) {
LOGINFO("applying raft log commit with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast<const ReplicationMessageHeader*>(header.bytes);
//TODO: read data from pba or get data from replication context(prefered)
sisl::sg_list value;

switch (msg_header->message_type) {
case SHARD_MESSAGE: {
home_object->on_shard_message_commit(lsn, header, key, value, ctx);
case ReplicationMessageType::SHARD_MESSAGE: {
_home_object->on_shard_message_commit(lsn, header, key,ctx);
break;
}
case PG_MESSAGE:
case BLOB_MESSAGE:
case ReplicationMessageType::PG_MESSAGE:
case ReplicationMessageType::BLOB_MESSAGE:
default: {
break;
}
}
}

void ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {}
void ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {
LOGINFO("on_pre_commit with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast<const ReplicationMessageHeader*>(header.bytes);

void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {}
switch (msg_header->message_type) {
case ReplicationMessageType::SHARD_MESSAGE: {
_home_object->on_pre_commit_shard_msg(lsn, header, key,ctx);
break;
}
case ReplicationMessageType::PG_MESSAGE:
case ReplicationMessageType::BLOB_MESSAGE:
default: {
break;
}
}
}

blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header) {
return blk_alloc_hints();
void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {
LOGINFO("rollback with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast<const ReplicationMessageHeader*>(header.bytes);

switch (msg_header->message_type) {
case ReplicationMessageType::SHARD_MESSAGE: {
_home_object->on_rollback_shard_msg(lsn, header, key,ctx);
break;
}
case ReplicationMessageType::PG_MESSAGE:
case ReplicationMessageType::BLOB_MESSAGE:
default: {
break;
}
}
}

void ReplicationStateMachine::on_replica_stop() {}
Expand Down
30 changes: 10 additions & 20 deletions src/lib/homestore/replication_state_machine.hpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#pragma once

#include "homestore/replication/repl_dev.h"

#include "homeobject.hpp"
#include "mocks/repl_service.h"

namespace homeobject {

class HomeObjectImpl;

class ReplicationStateMachine : public homestore::ReplicaDevListener {
class ReplicationStateMachine : public home_replication::ReplicaSetListener {
public:
/// @brief Called when the log entry has been committed in the replica set.
///
Expand All @@ -20,10 +20,10 @@ class ReplicationStateMachine : public homestore::ReplicaDevListener {
/// @param pbas - List of pbas where data is written to the storage engine.
/// @param ctx - User contenxt passed as part of the replica_set::write() api
///
virtual void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, blkid_list_t const& blkids,
void* ctx);
virtual void on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
const home_replication::pba_list_t& pbas, void* ctx);

/// @brief Called when the log entry has been received by the replica dev.
/// @brief Called when the log entry has been received by the replica set.
///
/// On recovery, this is called from a random worker thread before the raft server is started. It is
/// guaranteed to be serialized in log index order.
Expand All @@ -41,7 +41,7 @@ class ReplicationStateMachine : public homestore::ReplicaDevListener {
/// @param header - Header originally passed with replica_set::write() api
/// @param key - Key originally passed with replica_set::write() api
/// @param ctx - User contenxt passed as part of the replica_set::write() api
virtual void on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx);
virtual void on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx);

/// @brief Called when the log entry has been rolled back by the replica set.
///
Expand All @@ -56,23 +56,13 @@ class ReplicationStateMachine : public homestore::ReplicaDevListener {
/// @param header - Header originally passed with replica_set::write() api
/// @param key - Key originally passed with replica_set::write() api
/// @param ctx - User contenxt passed as part of the replica_set::write() api
virtual void on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx);

/// @brief Called when replication module is trying to allocate a block to write the value
///
/// This function can be called both on leader and follower when it is trying to allocate a block to write the
/// value. Callee is expected to provide hints for allocation based on the header supplied as part of original
/// write. In cases where callee don't care about the hints can return default blk_alloc_hints.
///
/// @param header Header originally passed with repl_dev::write() api on the leader
/// @return Expected to return blk_alloc_hints for this write
virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header);
virtual void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx);

/// @brief Called when the replica set is being stopped
virtual void on_replica_stop();
virtual void on_replica_stop() = 0;

private:
HomeObjectImpl* home_object;
HSHomeObject* _home_object;
};

}
Loading

0 comments on commit 0af7f2e

Please sign in to comment.