Skip to content

Commit

Permalink
SDSTOR-11601 add create_shard implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
zichanglai committed Sep 5, 2023
1 parent 1272ed1 commit 9945e64
Show file tree
Hide file tree
Showing 11 changed files with 327 additions and 40 deletions.
8 changes: 8 additions & 0 deletions src/include/homeobject/pg_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <sisl/utility/enum.hpp>

#include "common.hpp"
#include "shard_manager.hpp"

namespace homeobject {

Expand All @@ -27,6 +28,13 @@ struct PGInfo {
mutable MemberSet members;
};

struct PG {
explicit PG(PGInfo info) : pg_info(std::move(info)) {}
PGInfo pg_info;
uint64_t next_sequence_num_for_new_shard{0};
CShardInfoList shards;
};

class PGManager : public Manager< PGError > {
public:
virtual NullAsyncResult create_pg(PGInfo&& pg_info) = 0;
Expand Down
6 changes: 5 additions & 1 deletion src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@ struct ShardInfo {
uint64_t deleted_capacity_bytes;
std::optional< peer_id > current_leader{std::nullopt};
};
using InfoList = std::list< ShardInfo >;

using InfoList = std::list<ShardInfo>;
using CShardInfo = std::shared_ptr<ShardInfo>;
using CShardInfoList = std::list< CShardInfo >;

class ShardManager : public Manager< ShardError > {
public:
static uint64_t max_shard_size(); // Static function forces runtime evaluation.
static uint64_t max_shard_num_in_pg();

virtual AsyncResult< ShardInfo > get_shard(shard_id id) const = 0;
virtual AsyncResult< InfoList > list_shards(pg_id id) const = 0;
Expand Down
15 changes: 7 additions & 8 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <sisl/logging/logging.h>
#include <home_replication/repl_service.h>


template <>
struct std::hash< homeobject::ShardInfo > {
std::size_t operator()(homeobject::ShardInfo const& i) const noexcept { return std::hash< uint64_t >()(i.id); }
Expand All @@ -24,9 +25,6 @@ class HomeObjectImpl : public HomeObject,
public ShardManager,
public std::enable_shared_from_this< HomeObjectImpl > {

std::mutex _repl_lock;
std::shared_ptr< home_replication::ReplicationService > _repl_svc;

/// Implementation defines these
virtual folly::Future< ShardManager::Result< ShardInfo > > _get_shard(shard_id) const = 0;
virtual ShardManager::Result< ShardInfo > _create_shard(pg_id, uint64_t size_bytes) = 0;
Expand All @@ -39,21 +37,21 @@ class HomeObjectImpl : public HomeObject,
///

protected:
std::mutex _repl_lock;
std::shared_ptr< home_replication::ReplicationService > _repl_svc;
//std::shared_ptr<homestore::ReplicationService> _repl_svc;
peer_id _our_id;

/// Our SvcId retrieval and SvcId->IP mapping
std::weak_ptr< HomeObjectApplication > _application;

///
mutable std::shared_mutex _pg_lock;
using shard_set = std::unordered_set< ShardInfo >;
using pg_pair = std::pair< PGInfo, shard_set >;
std::map< pg_id, pg_pair > _pg_map;
std::map< pg_id, PG > _pg_map;

mutable std::shared_mutex _shard_lock;
std::map< shard_id, shard_set::const_iterator > _shard_map;
std::map< shard_id, CShardInfo > _shard_map;
///

public:
explicit HomeObjectImpl(std::weak_ptr< HomeObjectApplication >&& application) :
_application(std::move(application)) {}
Expand All @@ -79,6 +77,7 @@ class HomeObjectImpl : public HomeObject,
ShardManager::AsyncResult< InfoList > list_shards(pg_id pg) const final;
ShardManager::AsyncResult< ShardInfo > seal_shard(shard_id id) final;

static const std::string s_shard_info_sub_type;
/// BlobManager
BlobManager::AsyncResult< blob_id > put(shard_id shard, Blob&&) final;
BlobManager::AsyncResult< Blob > get(shard_id shard, blob_id const& blob, uint64_t off, uint64_t len) const final;
Expand Down
12 changes: 12 additions & 0 deletions src/lib/homestore/homeobject.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
#include "homeobject.hpp"

#include <homestore/homestore.hpp>
#include <homestore/meta_service.hpp>

namespace homeobject {

const std::string HomeObjectImpl::s_shard_info_sub_type = "shard_info";

extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application) {
LOGINFOMOD(homeobject, "Initializing HomeObject");
//register some callbacks for metadata recovery;
homestore::hs()->meta_service().register_handler(HomeObjectImpl::s_shard_info_sub_type,
HSHomeObject::on_shard_meta_blk_found,
nullptr,
true);
auto instance = std::make_shared< HSHomeObject >(std::move(application));
instance->init_repl_svc();
return instance;
Expand All @@ -20,4 +30,6 @@ void HomeObjectImpl::init_repl_svc() {
}
}

void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {}

} // namespace homeobject
13 changes: 12 additions & 1 deletion src/lib/homestore/homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

#include "lib/homeobject_impl.hpp"

namespace homestore {
struct meta_blk;
}

namespace homeobject {

class HSHomeObject : public HomeObjectImpl {
Expand All @@ -20,10 +24,17 @@ class HSHomeObject : public HomeObjectImpl {
BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id) const override;
BlobManager::NullResult _del_blob(ShardInfo const&, blob_id) override;
///

private:
bool check_if_pg_exist(pg_id pg);
shard_id generate_new_shard_id(pg_id pg);
shard_id make_new_shard_id(pg_id pg, uint64_t sequence_num);
std::string prepare_create_shard_message(pg_id pg, shard_id new_shard_id, uint64_t shard_size);
public:
using HomeObjectImpl::HomeObjectImpl;
~HSHomeObject() override = default;
static void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size);
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
sisl::sg_list const& value, void* user_ctx);
};

} // namespace homeobject
26 changes: 26 additions & 0 deletions src/lib/homestore/replication_message.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include <sisl/utility/enum.hpp>
#include <isa-l/crc.h>

namespace homeobject {

ENUM(ReplicationMessageType, uint16_t, PG_MESSAGE = 0, SHARD_MESSAGE, BLOB_MESSAGE, UNKNOWN_MESSAGE);

// magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum'
static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34;

struct ReplicationMessageHeader {
uint64_t magic_num{HOMEOBJECT_REPLICATION_MAGIC};
ReplicationMessageType message_type;
uint32_t message_size;
uint32_t message_crc;
uint8_t reserved_pad[2]{};
uint32_t header_crc;
uint32_t calculate_crc() const {
return crc32_ieee(0, r_cast<const unsigned char*>(this), sizeof(*this) - sizeof(header_crc));
}
};


}
34 changes: 34 additions & 0 deletions src/lib/homestore/replication_state_machine.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include "replication_state_machine.hpp"
#include "homeobject_impl.hpp"

namespace homeobject {

void ReplicationStateMachine::on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
blkid_list_t const& blkids,void* ctx) {
const ReplicationMessageHeader* msg_header = r_cast<const ReplicationMessageHeader*>(header.bytes);
//TODO: read data from pba or get data from replication context(prefered)
sisl::sg_list value;

switch (msg_header->message_type) {
case SHARD_MESSAGE: {
home_object->on_shard_message_commit(lsn, header, key, value, ctx);
}
case PG_MESSAGE:
case BLOB_MESSAGE:
default: {
break;
}
}
}

void ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {}

void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx) {}

blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header) {
return blk_alloc_hints();
}

void ReplicationStateMachine::on_replica_stop() {}

}
78 changes: 78 additions & 0 deletions src/lib/homestore/replication_state_machine.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#pragma once

#include "homestore/replication/repl_dev.h"


namespace homeobject {

class HomeObjectImpl;

class ReplicationStateMachine : public homestore::ReplicaDevListener {
public:
/// @brief Called when the log entry has been committed in the replica set.
///
/// This function is called from a dedicated commit thread which is different from the original thread calling
/// replica_set::write(). There is only one commit thread, and lsn is guaranteed to be monotonically increasing.
///
/// @param lsn - The log sequence number
/// @param header - Header originally passed with replica_set::write() api
/// @param key - Key originally passed with replica_set::write() api
/// @param pbas - List of pbas where data is written to the storage engine.
/// @param ctx - User contenxt passed as part of the replica_set::write() api
///
virtual void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, blkid_list_t const& blkids,
void* ctx);

/// @brief Called when the log entry has been received by the replica dev.
///
/// On recovery, this is called from a random worker thread before the raft server is started. It is
/// guaranteed to be serialized in log index order.
///
/// On the leader, this is called from the same thread that replica_set::write() was called.
///
/// On the follower, this is called when the follower has received the log entry. It is guaranteed to be serialized
/// in log sequence order.
///
/// NOTE: Listener can choose to ignore this pre commit, however, typical use case of maintaining this is in-case
/// replica set needs to support strong consistent reads and follower needs to ignore any keys which are not being
/// currently in pre-commit, but yet to be committed.
///
/// @param lsn - The log sequence number
/// @param header - Header originally passed with replica_set::write() api
/// @param key - Key originally passed with replica_set::write() api
/// @param ctx - User contenxt passed as part of the replica_set::write() api
virtual void on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx);

/// @brief Called when the log entry has been rolled back by the replica set.
///
/// This function is called on followers only when the log entry is going to be overwritten. This function is called
/// from a random worker thread, but is guaranteed to be serialized.
///
/// For each log index, it is guaranteed that either on_commit() or on_rollback() is called but not both.
///
/// NOTE: Listener should do the free any resources created as part of pre-commit.
///
/// @param lsn - The log sequence number getting rolled back
/// @param header - Header originally passed with replica_set::write() api
/// @param key - Key originally passed with replica_set::write() api
/// @param ctx - User contenxt passed as part of the replica_set::write() api
virtual void on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, void* ctx);

/// @brief Called when replication module is trying to allocate a block to write the value
///
/// This function can be called both on leader and follower when it is trying to allocate a block to write the
/// value. Callee is expected to provide hints for allocation based on the header supplied as part of original
/// write. In cases where callee don't care about the hints can return default blk_alloc_hints.
///
/// @param header Header originally passed with repl_dev::write() api on the leader
/// @return Expected to return blk_alloc_hints for this write
virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header);

/// @brief Called when the replica set is being stopped
virtual void on_replica_stop();

private:
HomeObjectImpl* home_object;
};

}
Loading

0 comments on commit 9945e64

Please sign in to comment.