Skip to content

Commit

Permalink
SDSTOR-11601 add create_shard implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
zichanglai committed Sep 1, 2023
1 parent ab66ca9 commit ea7f37d
Show file tree
Hide file tree
Showing 9 changed files with 299 additions and 12 deletions.
6 changes: 6 additions & 0 deletions src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@ struct ShardInfo {
};
using InfoList = std::list< ShardInfo >;

struct ShardContextsInPG {
uint64_t sequence_num{0};
std::unordered_set<shard_id> shards;
};

class ShardManager : public Manager< ShardError > {
public:
static uint64_t max_shard_size(); // Static function forces runtime evaluation.
static uint64_t max_shard_num_in_pg();

virtual AsyncResult< ShardInfo > get_shard(shard_id id) const = 0;
virtual AsyncResult< InfoList > list_shards(pg_id id) const = 0;
Expand Down
11 changes: 11 additions & 0 deletions src/lib/homeobject_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,19 @@

#include <boost/uuid/uuid_io.hpp>

#include <homestore/meta_service.hpp>

namespace homeobject {

const std::string HomeObjectImpl::s_shard_info_sub_type = "shard_info";

extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application) {
LOGINFOMOD(homeobject, "Initializing HomeObject");
//register some callbacks for metadata recovery;
homestore::hs()->meta_service().register_handler(HomeObjectImpl::s_shard_info_sub_type,
HomeObjectImpl::on_shard_meta_blk_found,
nullptr,
true);
auto instance = std::make_shared< HomeObjectImpl >(std::move(application));
instance->init_repl_svc();
return instance;
Expand All @@ -22,4 +31,6 @@ void HomeObjectImpl::init_repl_svc() {
}
}

void HomeObjectImpl::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {}

} // namespace homeobject
27 changes: 21 additions & 6 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
#include "homeobject/pg_manager.hpp"
#include "homeobject/shard_manager.hpp"

#include <homestore/homestore.hpp>

#include <sisl/logging/logging.h>
#include <home_replication/repl_service.h>

namespace homestore {
struct meta_blk;
}

namespace homeobject {

class HomeObjectImpl : public HomeObject,
Expand All @@ -17,8 +23,7 @@ class HomeObjectImpl : public HomeObject,
public std::enable_shared_from_this< HomeObjectImpl > {

std::mutex _repl_lock;
std::shared_ptr< home_replication::ReplicationService > _repl_svc;

std::shared_ptr<homestore::ReplicationService> _repl_svc;
protected:
peer_id _our_id;

Expand All @@ -27,9 +32,15 @@ class HomeObjectImpl : public HomeObject,

/// This simulates the MetaBlkSvc thats used within real HomeObject
mutable std::mutex _pg_lock;
std::map< pg_id, std::unordered_set< shard_id > > _pg_map;
///

std::map< pg_id, ShardContextsInPG > _pg_map;

mutable std::mutex _shard_mutex;
std::map<shard_id, ShardInfo> _shard_map;
private:
bool check_if_pg_exist(pg_id pg);
shard_id generate_new_shard_id(pg_id pg);
shard_id make_new_shard_id(pg_id pg, uint64_t sequence_num);
std::string prepare_create_shard_message(pg_id pg, shard_id new_shard_id, uint64_t shard_size);
public:
explicit HomeObjectImpl(std::weak_ptr< HomeObjectApplication >&& application) :
_application(std::move(application)) {}
Expand All @@ -54,7 +65,11 @@ class HomeObjectImpl : public HomeObject,
ShardManager::AsyncResult< InfoList > list_shards(pg_id pg) const override;
ShardManager::AsyncResult< ShardInfo > create_shard(pg_id pg_owner, uint64_t size_bytes) override;
ShardManager::AsyncResult< ShardInfo > seal_shard(shard_id id) override;

void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
sisl::sg_list const& value, void* user_ctx);
static void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size);
static const std::string s_shard_info_sub_type;

/// BlobManager
BlobManager::AsyncResult< blob_id > put(shard_id shard, Blob&&) override;
BlobManager::AsyncResult< Blob > get(shard_id shard, blob_id const& blob, uint64_t off,
Expand Down
2 changes: 1 addition & 1 deletion src/lib/pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ PGManager::NullAsyncResult HomeObjectImpl::create_pg(PGInfo&& pg_info) {
return folly::makeUnexpected(PGError::INVALID_ARG);

auto lg = std::scoped_lock(_pg_lock);
auto [it, _] = _pg_map.try_emplace(pg_info.id, std::unordered_set< shard_id >());
auto [it, _] = _pg_map.try_emplace(pg_info.id, ShardContextsInPG());
RELEASE_ASSERT(_pg_map.end() != it, "Unknown map insert error!");
return folly::Unit();
});
Expand Down
26 changes: 26 additions & 0 deletions src/lib/replication_message.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include <sisl/utility/enum.hpp>
#include <isa-l/crc.h>

namespace homeobject {

ENUM(ReplicationMessageType, uint16_t, PG_MESSAGE = 0, SHARD_MESSAGE, BLOB_MESSAGE, UNKNOWN_MESSAGE);

// magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum'
static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34;

struct ReplicationMessageHeader {
uint64_t magic_num;
ReplicationMessageType message_type;
uint32_t message_size;
uint32_t message_crc;
uint8_t reserved_pad[2]{};
uint32_t header_crc;
uint32_t calculate_crc() const {
return crc32_ieee(0, r_cast<const unsigned char*>(this), sizeof(*this) - sizeof(header_crc));
}
};


}
32 changes: 32 additions & 0 deletions src/lib/replication_state_machine.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#include "replication_state_machine.hpp"
#include "homeobject_impl.hpp"

namespace homeobject {

void ReplicationStateMachine::on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
blkid_list_t const& blkids,void* ctx) {
const ReplicationMessageHeader* msg_header = r_cast<const ReplicationMessageHeader*>(header.bytes);
//TODO: read data from pba or get data from replication context(prefered)
sisl::sg_list value;

switch (msg_header->message_type) {
case SHARD_MESSAGE: {
home_object->on_shard_message_commit(lsn, header, key, value, ctx);
}
case PG_MESSAGE:
case BLOB_MESSAGE:
default: {
break;
}
}
}

void ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {}

void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {}

blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header) {}

void ReplicationStateMachine::on_replica_stop() {}

}
78 changes: 78 additions & 0 deletions src/lib/replication_state_machine.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#pragma once

#include "homestore/replication/repl_dev.h"


namespace homeobject {

class HomeObjectImpl;

class ReplicationStateMachine : public homestore::ReplicaDevListener {
public:
/// @brief Called when the log entry has been committed in the replica set.
///
/// This function is called from a dedicated commit thread which is different from the original thread calling
/// replica_set::write(). There is only one commit thread, and lsn is guaranteed to be monotonically increasing.
///
/// @param lsn - The log sequence number
/// @param header - Header originally passed with replica_set::write() api
/// @param key - Key originally passed with replica_set::write() api
/// @param pbas - List of pbas where data is written to the storage engine.
/// @param ctx - User contenxt passed as part of the replica_set::write() api
///
virtual void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, blkid_list_t const& blkids,
void* ctx);

/// @brief Called when the log entry has been received by the replica dev.
///
/// On recovery, this is called from a random worker thread before the raft server is started. It is
/// guaranteed to be serialized in log index order.
///
/// On the leader, this is called from the same thread that replica_set::write() was called.
///
/// On the follower, this is called when the follower has received the log entry. It is guaranteed to be serialized
/// in log sequence order.
///
/// NOTE: Listener can choose to ignore this pre commit, however, typical use case of maintaining this is in-case
/// replica set needs to support strong consistent reads and follower needs to ignore any keys which are not being
/// currently in pre-commit, but yet to be committed.
///
/// @param lsn - The log sequence number
/// @param header - Header originally passed with replica_set::write() api
/// @param key - Key originally passed with replica_set::write() api
/// @param ctx - User contenxt passed as part of the replica_set::write() api
virtual void on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx);

/// @brief Called when the log entry has been rolled back by the replica set.
///
/// This function is called on followers only when the log entry is going to be overwritten. This function is called
/// from a random worker thread, but is guaranteed to be serialized.
///
/// For each log index, it is guaranteed that either on_commit() or on_rollback() is called but not both.
///
/// NOTE: Listener should do the free any resources created as part of pre-commit.
///
/// @param lsn - The log sequence number getting rolled back
/// @param header - Header originally passed with replica_set::write() api
/// @param key - Key originally passed with replica_set::write() api
/// @param ctx - User contenxt passed as part of the replica_set::write() api
virtual void on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx);

/// @brief Called when replication module is trying to allocate a block to write the value
///
/// This function can be called both on leader and follower when it is trying to allocate a block to write the
/// value. Callee is expected to provide hints for allocation based on the header supplied as part of original
/// write. In cases where callee don't care about the hints can return default blk_alloc_hints.
///
/// @param header Header originally passed with repl_dev::write() api on the leader
/// @return Expected to return blk_alloc_hints for this write
virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header);

/// @brief Called when the replica set is being stopped
virtual void on_replica_stop();

private:
HomeObjectImpl* home_object;
};

}
125 changes: 122 additions & 3 deletions src/lib/shard_manager.cpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,100 @@
#include "homeobject_impl.hpp"
#include "replication_message.hpp"

#include <homestore/meta_service.hpp>

namespace homeobject {

uint64_t ShardManager::max_shard_size() { return Gi; }

uint64_t ShardManager::max_shard_num_in_pg() {return ((uint64_t)0x01) << 48;}

std::shared_ptr< ShardManager > HomeObjectImpl::shard_manager() {
init_repl_svc();
return shared_from_this();
}

bool HomeObjectImpl::check_if_pg_exist(pg_id pg) {
std::scoped_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg);
return (iter == _pg_map.end()) ? false : true;
}

shard_id HomeObjectImpl::generate_new_shard_id(pg_id pg) {
std::scoped_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg);
if (iter == _pg_map.end()) {
iter = _pg_map.insert(std::make_pair(pg, ShardContextsInPG())).first;
}
auto new_sequence_num = iter->second.sequence_num++;
RELEASE_ASSERT(new_sequence_num < ShardManager::max_shard_num_in_pg(), "new shard id must be less than ShardManager::max_shard_num_in_pg()");
return make_new_shard_id(pg, new_sequence_num);
}

shard_id HomeObjectImpl::make_new_shard_id(pg_id pg, uint64_t sequence_num) {
return ((uint64_t)pg << 48) | sequence_num;
}

std::string HomeObjectImpl::prepare_create_shard_message(pg_id pg, shard_id new_shard_id, uint64_t shard_size) {
nlohmann::json j;
j["pg_id"] = pg;
j["shard_id"] = new_shard_id;
j["state"] = ShardInfo::State::OPEN;
j["capacity"] = shard_size;
return j.dump();
}

ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::create_shard(pg_id pg_owner, uint64_t size_bytes) {
if (0 == size_bytes || max_shard_size() < size_bytes) return folly::makeUnexpected(ShardError::INVALID_ARG);
return folly::makeUnexpected(ShardError::UNKNOWN_PG);
if (0 == size_bytes || max_shard_size() < size_bytes)
return folly::makeUnexpected(ShardError::INVALID_ARG);

bool pg_exist = check_if_pg_exist(pg_owner);
if (!pg_exist) {
LOGWARN("failed to create shard with non-exist pg [{}]", pg_owner);
return folly::makeUnexpected(ShardError::UNKNOWN_PG);
}

auto repl_dev = _repl_svc->get_replica_dev(fmt::format("{}", pg_owner));
if (repl_dev == nullptr) {
LOGWARN("failed to get replica set instance for pg [{}]", pg_owner);
return folly::makeUnexpected(ShardError::UNKNOWN_PG);
}

if (!repl_dev->is_leader()) {
LOGWARN("pg [{}] replica set is not a leader, please retry other pg members", pg_owner);
return folly::makeUnexpected(ShardError::NOT_LEADER);
}

auto new_shard_id = generate_new_shard_id(pg_owner);
std::string create_shard_message = prepare_create_shard_message(pg_owner, new_shard_id, size_bytes);
ReplicationMessageHeader header;
header.magic_num = HOMEOBJECT_REPLICATION_MAGIC;
header.message_type = ReplicationMessageType::SHARD_MESSAGE;
header.message_size = create_shard_message.size();
header.message_crc = crc32_ieee(0, r_cast< const uint8_t* >(create_shard_message.c_str()), create_shard_message.size());
header.header_crc = header.calculate_crc();
sisl::sg_list value;
value.size = create_shard_message.size();
value.iovs.emplace_back(iovec{static_cast<void*>(const_cast<char*>(create_shard_message.c_str())),
create_shard_message.size()});

//replicate this create shard message to PG members;
folly::Promise<ShardManager::Result<ShardInfo> > *promise = new folly::Promise<ShardManager::Result<ShardInfo> >();
repl_dev->async_alloc_write(sisl::blob(uintptr_cast(&header), sizeof(header)),
sisl::blob(),
value,
promise);

return promise->getSemiFuture().via(folly::getGlobalCPUExecutor())
.thenValue([promise, this](ShardManager::Result<ShardInfo> const& v) -> ShardManager::Result<ShardInfo> {
std::unique_ptr<folly::Promise<ShardManager::Result<ShardInfo>> > promise_guard(promise);
if (bool(v)) {
const auto& shard_info = *v;
std::scoped_lock lock_guard(_pg_lock, _shard_mutex);
_pg_map[shard_info.placement_group].shards.insert(shard_info.id);
_shard_map[shard_info.id] = shard_info;
}
return v;
});
}

ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::seal_shard(shard_id id) {
Expand All @@ -26,4 +109,40 @@ ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::get_shard(shard_id id) co
return folly::makeUnexpected(ShardError::UNKNOWN_SHARD);
}

void HomeObjectImpl::on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
sisl::sg_list const& value, void* user_ctx) {
const ReplicationMessageHeader* msg_header = r_cast<const ReplicationMessageHeader*>(header.bytes);
folly::Promise<ShardManager::Result<ShardInfo> > *promise = r_cast<folly::Promise<ShardManager::Result<ShardInfo> >*> (user_ctx);
if (msg_header->header_crc != msg_header->calculate_crc()) {
LOGWARN("replication message header is corrupted with crc error, lsn:{}", lsn);
promise->setValue(folly::makeUnexpected(ShardError::INVALID_ARG));
return;
}

std::string string_value;
for (auto & iovec: value.iovs) {
string_value.append(static_cast<char*>(iovec.iov_base), iovec.iov_len);
}

auto crc = crc32_ieee(0, r_cast< const uint8_t* >(string_value.c_str()), string_value.size());
if (msg_header->message_crc != crc) {
LOGWARN("replication message body is corrupted with crc error, lsn:{}", lsn);
promise->setValue(folly::makeUnexpected(ShardError::INVALID_ARG));
return;
}

//check is passed and it is time to submit shard info to homestore meta blk;
void* cookie = nullptr;
homestore::hs()->meta_service().add_sub_sb(HomeObjectImpl::s_shard_info_sub_type, r_cast<const uint8_t*>(string_value.c_str()), string_value.size(), cookie);

//prepare for return shard info;
auto shard_info_json = nlohmann::json::parse(string_value);
ShardInfo shard_info;
shard_info.id = shard_info_json["shard_id"].get<shard_id>();
shard_info.placement_group = shard_info_json["pg_id"].get<pg_id>();
shard_info.state = static_cast<ShardInfo::State>(shard_info_json["state"].get<int>());
shard_info.total_capacity_bytes = shard_info_json["capacity"].get<uint64_t>();
promise->setValue(ShardManager::Result<ShardInfo>(shard_info));
}

} // namespace homeobject
Loading

0 comments on commit ea7f37d

Please sign in to comment.