Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDSTOR-11601 add create_shard implementation #34

Merged
merged 11 commits into from
Sep 19, 2023
2 changes: 2 additions & 0 deletions src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ struct ShardInfo {
uint64_t deleted_capacity_bytes;
std::optional< peer_id > current_leader{std::nullopt};
};

using InfoList = std::list< ShardInfo >;

class ShardManager : public Manager< ShardError > {
public:
static uint64_t max_shard_size(); // Static function forces runtime evaluation.
static uint64_t max_shard_num_in_pg();
szmyd marked this conversation as resolved.
Show resolved Hide resolved

virtual AsyncResult< ShardInfo > get_shard(shard_id id) const = 0;
virtual AsyncResult< InfoList > list_shards(pg_id id) const = 0;
Expand Down
8 changes: 4 additions & 4 deletions src/lib/blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,23 @@ BlobManager::AsyncResult< Blob > HomeObjectImpl::get(shard_id shard, blob_id con
uint64_t len) const {
return _get_shard(shard).thenValue([this, blob](auto const e) -> BlobManager::Result< Blob > {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
return _get_blob(e.value(), blob);
return _get_blob(e.value().info, blob);
});
}

BlobManager::AsyncResult< blob_id > HomeObjectImpl::put(shard_id shard, Blob&& blob) {
return _get_shard(shard).thenValue(
[this, blob = std::move(blob)](auto const e) mutable -> BlobManager::Result< blob_id > {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
if (ShardInfo::State::SEALED == e.value().state) return folly::makeUnexpected(BlobError::INVALID_ARG);
return _put_blob(e.value(), std::move(blob));
if (ShardInfo::State::SEALED == e.value().info.state) return folly::makeUnexpected(BlobError::INVALID_ARG);
return _put_blob(e.value().info, std::move(blob));
});
}

BlobManager::NullAsyncResult HomeObjectImpl::del(shard_id shard, blob_id const& blob) {
return _get_shard(shard).thenValue([this, blob](auto const e) mutable -> BlobManager::NullResult {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
return _del_blob(e.value(), blob);
return _del_blob(e.value().info, blob);
});
}

Expand Down
44 changes: 28 additions & 16 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@

#include <sisl/logging/logging.h>

template <>
struct std::hash< homeobject::ShardInfo > {
std::size_t operator()(homeobject::ShardInfo const& i) const noexcept { return std::hash< uint64_t >()(i.id); }
};

namespace home_replication {
class ReplicationService;
}
Expand All @@ -21,15 +16,28 @@ namespace homeobject {
inline bool operator<(ShardInfo const& lhs, ShardInfo const& rhs) { return lhs.id < rhs.id; }
inline bool operator==(ShardInfo const& lhs, ShardInfo const& rhs) { return lhs.id == rhs.id; }

struct Shard {
explicit Shard(ShardInfo info) : info(std::move(info)) {}
ShardInfo info;
uint16_t chunk_id;
};

using ShardList = std::list< Shard >;
using ShardIterator = ShardList::iterator;

struct PG {
explicit PG(PGInfo info) : pg_info(std::move(info)) {}
PGInfo pg_info;
uint64_t shard_sequence_num{0};
ShardList shards;
};

class HomeObjectImpl : public HomeObject,
public BlobManager,
public PGManager,
public ShardManager,
public std::enable_shared_from_this< HomeObjectImpl > {

std::mutex _repl_lock;
std::shared_ptr< home_replication::ReplicationService > _repl_svc;

/// Implementation defines these
virtual ShardManager::Result< ShardInfo > _create_shard(pg_id, uint64_t size_bytes) = 0;
virtual ShardManager::Result< ShardInfo > _seal_shard(shard_id) = 0;
Expand All @@ -38,31 +46,35 @@ class HomeObjectImpl : public HomeObject,
virtual BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id) const = 0;
virtual BlobManager::NullResult _del_blob(ShardInfo const&, blob_id) = 0;
///

folly::Future< ShardManager::Result< ShardInfo > > _get_shard(shard_id id) const;
folly::Future< ShardManager::Result< Shard > > _get_shard(shard_id id) const;
auto _defer() const { return folly::makeSemiFuture().via(folly::getGlobalCPUExecutor()); }

protected:
std::mutex _repl_lock;
std::shared_ptr< home_replication::ReplicationService > _repl_svc;
//std::shared_ptr<homestore::ReplicationService> _repl_svc;
peer_id _our_id;

/// Our SvcId retrieval and SvcId->IP mapping
std::weak_ptr< HomeObjectApplication > _application;

///
mutable std::shared_mutex _pg_lock;
using shard_set = std::unordered_set< ShardInfo >;
using pg_pair = std::pair< PGInfo, shard_set >;
std::map< pg_id, pg_pair > _pg_map;
std::map< pg_id, PG > _pg_map;

mutable std::shared_mutex _shard_lock;
std::map< shard_id, shard_set::const_iterator > _shard_map;
std::map < shard_id, ShardIterator > _shard_map;
///

PGManager::Result< PG > _get_pg(pg_id pg);
public:
explicit HomeObjectImpl(std::weak_ptr< HomeObjectApplication >&& application) :
_application(std::move(application)) {}

~HomeObjectImpl() override = default;
HomeObjectImpl(const HomeObjectImpl&) = delete;
HomeObjectImpl(HomeObjectImpl&&) noexcept = delete;
HomeObjectImpl& operator=(const HomeObjectImpl&) = delete;
HomeObjectImpl& operator=(HomeObjectImpl&&) noexcept = delete;

// This is public but not exposed in the API above
void init_repl_svc();
Expand All @@ -82,7 +94,7 @@ class HomeObjectImpl : public HomeObject,
ShardManager::AsyncResult< ShardInfo > create_shard(pg_id pg_owner, uint64_t size_bytes) final;
ShardManager::AsyncResult< InfoList > list_shards(pg_id pg) const final;
ShardManager::AsyncResult< ShardInfo > seal_shard(shard_id id) final;

uint64_t get_current_timestamp();
/// BlobManager
BlobManager::AsyncResult< blob_id > put(shard_id shard, Blob&&) final;
BlobManager::AsyncResult< Blob > get(shard_id shard, blob_id const& blob, uint64_t off, uint64_t len) const final;
Expand Down
1 change: 1 addition & 0 deletions src/lib/homestore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE
homeobject.cpp
blob_manager.cpp
shard_manager.cpp
replication_state_machine.cpp
$<TARGET_OBJECTS:${PROJECT_NAME}_core>
)
target_link_libraries("${PROJECT_NAME}_homestore"
Expand Down
32 changes: 31 additions & 1 deletion src/lib/homestore/homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

#include <homestore/homestore.hpp>
#include <homestore/index_service.hpp>
#include <homestore/meta_service.hpp>
#include <iomgr/io_environment.hpp>

namespace homeobject {

const std::string HSHomeObject::s_shard_info_sub_type = "shard_info";

extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application) {
LOGINFOMOD(homeobject, "Initializing HomeObject");
auto instance = std::make_shared< HSHomeObject >(std::move(application));
Expand Down Expand Up @@ -40,7 +43,10 @@ void HSHomeObject::init_homestore() {
uint32_t services = HS_SERVICE::META | HS_SERVICE::LOG_REPLICATED | HS_SERVICE::LOG_LOCAL | HS_SERVICE::DATA;

bool need_format = HomeStore::instance()->start(
hs_input_params{.devices = device_info, .app_mem_size = app_mem_size, .services = services});
hs_input_params{.devices = device_info, .app_mem_size = app_mem_size, .services = services},
[this]() {
register_homestore_metablk_callback();
});

/// TODO how should this work?
LOGWARN("Persistence Looks Vacant, Formatting!!");
Expand All @@ -55,6 +61,17 @@ void HSHomeObject::init_homestore() {
}
}

void HSHomeObject::register_homestore_metablk_callback() {
//register some callbacks for metadata recovery;
using namespace homestore;
HomeStore::instance()->meta_service().register_handler(HSHomeObject::s_shard_info_sub_type,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_shard_meta_blk_found(mblk, buf, size);
},
nullptr,
true);
}

void HomeObjectImpl::init_repl_svc() {
auto lg = std::scoped_lock(_repl_lock);
if (!_repl_svc) {
Expand All @@ -72,4 +89,17 @@ HSHomeObject::~HSHomeObject() {
iomanager.stop();
}

void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
std::string shard_info_str;
shard_info_str.append(r_cast<const char*>(buf.bytes()), size);

auto shard = deserialize_shard(shard_info_str);
if (shard.info.state == ShardInfo::State::OPEN) {
// create shard;
do_commit_new_shard(shard);
} else {
do_commit_seal_shard(shard);
}
}

} // namespace homeobject
29 changes: 28 additions & 1 deletion src/lib/homestore/homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

#include "lib/homeobject_impl.hpp"

namespace homestore {
struct meta_blk;
}

namespace homeobject {

class HSHomeObject : public HomeObjectImpl {
Expand All @@ -18,12 +22,35 @@ class HSHomeObject : public HomeObjectImpl {
BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id) const override;
BlobManager::NullResult _del_blob(ShardInfo const&, blob_id) override;
///

mutable std::shared_mutex _flying_shard_lock;
std::map< int64_t, Shard > _flying_shards;
private:
shard_id generate_new_shard_id(pg_id pg);
shard_id make_new_shard_id(pg_id pg, uint64_t sequence_num) const;
uint64_t get_sequence_num_from_shard_id(uint64_t shard_id);
std::string serialize_shard(const Shard& shard) const;
Shard deserialize_shard(const std::string& shard_info_str) const;
void do_commit_new_shard(const Shard& shard);
void do_commit_seal_shard(const Shard& shard);
void register_homestore_metablk_callback();
public:
using HomeObjectImpl::HomeObjectImpl;
zichanglai marked this conversation as resolved.
Show resolved Hide resolved
~HSHomeObject();

void init_homestore();

static const std::string s_shard_info_sub_type;
void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size);

bool precheck_and_decode_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
std::string* msg);

void on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
void* user_ctx);
void on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
void* user_ctx);
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
void* user_ctx);
};

} // namespace homeobject
31 changes: 31 additions & 0 deletions src/lib/homestore/replication_message.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

#include "homeobject/common.hpp"

#include <sisl/utility/enum.hpp>
#include <isa-l/crc.h>

namespace homeobject {

ENUM(ReplicationMessageType, uint16_t, PG_MESSAGE = 0, SHARD_MESSAGE, BLOB_MESSAGE, UNKNOWN_MESSAGE);

// magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum'
static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34;
static constexpr uint32_t HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1 = 0x01;
static constexpr uint32_t init_crc32 = 0;

struct ReplicationMessageHeader {
uint64_t magic_num{HOMEOBJECT_REPLICATION_MAGIC};
uint32_t protocol_version{HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1};
ReplicationMessageType message_type;
uint32_t payload_size;
uint32_t payload_crc;
uint8_t reserved_pad[6]{};
uint32_t header_crc;
uint32_t calculate_crc() const {
return crc32_ieee(init_crc32, r_cast<const unsigned char*>(this), sizeof(*this) - sizeof(header_crc));
}
};


}
60 changes: 60 additions & 0 deletions src/lib/homestore/replication_state_machine.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#include "replication_message.hpp"
#include "replication_state_machine.hpp"

namespace homeobject {

void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
const home_replication::pba_list_t& pbas, void* ctx) {
LOGINFO("applying raft log commit with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast<const ReplicationMessageHeader*>(header.bytes);

switch (msg_header->message_type) {
case ReplicationMessageType::SHARD_MESSAGE: {
_home_object->on_shard_message_commit(lsn, header, key,ctx);
break;
}
case ReplicationMessageType::PG_MESSAGE:
case ReplicationMessageType::BLOB_MESSAGE:
default: {
break;
}
}
}

void ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {
LOGINFO("on_pre_commit with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast<const ReplicationMessageHeader*>(header.bytes);

switch (msg_header->message_type) {
case ReplicationMessageType::SHARD_MESSAGE: {
_home_object->on_pre_commit_shard_msg(lsn, header, key,ctx);
break;
}
case ReplicationMessageType::PG_MESSAGE:
case ReplicationMessageType::BLOB_MESSAGE:
default: {
break;
}
}
}

void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {
LOGINFO("rollback with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast<const ReplicationMessageHeader*>(header.bytes);

switch (msg_header->message_type) {
case ReplicationMessageType::SHARD_MESSAGE: {
_home_object->on_rollback_shard_msg(lsn, header, key,ctx);
break;
}
case ReplicationMessageType::PG_MESSAGE:
case ReplicationMessageType::BLOB_MESSAGE:
default: {
break;
}
}
}

void ReplicationStateMachine::on_replica_stop() {}

}
Loading