Skip to content

Commit

Permalink
SoloReplDev - a pass through replication device (#177)
Browse files Browse the repository at this point in the history
Provide implementation of ReplicationService and introduced a SoloReplDev implementation to have a non-replicated version of same APIs. As a result the HomeStore startup is modified as well.
  • Loading branch information
hkadayam authored Sep 21, 2023
1 parent 4b84609 commit 319a3d5
Show file tree
Hide file tree
Showing 33 changed files with 1,201 additions and 405 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "4.3.2"
version = "4.4.0"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ add_subdirectory(lib/logstore)
add_subdirectory(lib/meta)
add_subdirectory(lib/index)
add_subdirectory(lib/blkdata_svc/)
add_subdirectory(lib/replication/)

add_subdirectory(tests)
set(HOMESTORE_OBJECTS
Expand All @@ -46,6 +47,7 @@ set(HOMESTORE_OBJECTS
$<TARGET_OBJECTS:hs_checkpoint>
$<TARGET_OBJECTS:hs_index>
$<TARGET_OBJECTS:hs_datasvc>
$<TARGET_OBJECTS:hs_replication>
lib/homestore.cpp
#$<TARGET_OBJECTS:hs_cp>
#$<TARGET_OBJECTS:indx_mgr>
Expand Down
15 changes: 6 additions & 9 deletions src/include/homestore/blkdata_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,20 @@ struct vdev_info;
struct stream_info_t;
class BlkReadTracker;
struct blk_alloc_hints;

using blk_t = uint64_t;
using blk_list_t = folly::small_vector< blk_t, 4 >;
class ChunkSelector;

class BlkDataService {
public:
BlkDataService();
BlkDataService(shared< ChunkSelector > custom_chunk_selector);
~BlkDataService();

/**
* @brief : called in non-recovery mode to create a new vdev for data service
*
* @param size : size of this vdev
*/
void create_vdev(uint64_t size, homestore::blk_allocator_type_t alloc_type,
homestore::chunk_selector_type_t chunk_sel_type);
void create_vdev(uint64_t size, uint32_t blk_size, blk_allocator_type_t alloc_type,
chunk_selector_type_t chunk_sel_type);

/**
* @brief : called during recovery to open existing vdev for data service
Expand Down Expand Up @@ -113,7 +111,7 @@ class BlkDataService {
*
* @return : the block list that have the blocks;
*/
blk_list_t alloc_blks(uint32_t size);
BlkAllocStatus alloc_blks(uint32_t size, blk_alloc_hints const& hints, MultiBlkId& out_blkids);

/**
* @brief : asynchronous free block, it is asynchronous because it might need to wait for pending read to complete
Expand Down Expand Up @@ -144,15 +142,14 @@ class BlkDataService {
void start();

private:
BlkAllocStatus alloc_blks(uint32_t size, blk_alloc_hints const& hints, MultiBlkId& out_blkids);

void init();

static void process_data_completion(std::error_condition ec, void* cookie);

private:
std::shared_ptr< VirtualDev > m_vdev;
std::unique_ptr< BlkReadTracker > m_blk_read_tracker;
std::shared_ptr< ChunkSelector > m_custom_chunk_selector;
uint32_t m_blk_size;
};

Expand Down
8 changes: 5 additions & 3 deletions src/include/homestore/checkpoint/cp_mgr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@ class CPCallbacks {
virtual std::unique_ptr< CPContext > on_switchover_cp(CP* cur_cp, CP* new_cp) = 0;

/// @brief After gathering CPContext from all consumers, CPManager calls this method to flush the dirty buffers
/// accumulated in this CP. Once CP flush is completed, consumers are required to call the flush_done callback.
/// accumulated in this CP. Once CP flush is completed, consumers are required to set the promise corresponding to
/// returned future.
/// @param cp CP pointer to which the dirty buffers have to be flushed
/// @param done_cb Callback after cp is done
virtual folly::Future< bool > cp_flush(CP* cp) = 0;

/// @brief After flushed the CP, CPManager calls this method to clean up any CP related structures
/// @brief After all consumers flushed the CP, CPManager calls this method to clean up any CP related structures
/// @param cp
virtual void cp_cleanup(CP* cp) = 0;

Expand Down Expand Up @@ -164,9 +165,10 @@ class CPManager {
std::vector< iomgr::io_fiber_t > m_cp_io_fibers;

public:
CPManager(bool first_time_boot);
CPManager();
virtual ~CPManager();

void start(bool first_time_boot);
/// @brief Shutdown the checkpoint manager services. It will not trigger a flush, but cancels any existing
/// checkpoint session abruptly. If caller needs clean shutdown, then they explicitly needs to trigger cp flush
/// before calling shutdown.
Expand Down
51 changes: 44 additions & 7 deletions src/include/homestore/homestore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@ class MetaBlkService;
class LogStoreService;
class BlkDataService;
class IndexService;
class ReplicationServiceImpl;
class IndexServiceCallbacks;
class ReplServiceCallbacks;
struct vdev_info;
class HomeStore;
class CPManager;
class VirtualDev;
class ChunkSelector;

using HomeStoreSafePtr = std::shared_ptr< HomeStore >;

Expand All @@ -63,8 +66,37 @@ struct hs_vdev_context {
};
#pragma pack()

typedef std::function< void(void) > hs_before_services_starting_cb_t;
typedef std::function< void(bool success) > hs_comp_callback;
using hs_before_services_starting_cb_t = std::function< void(void) >;

struct HS_SERVICE {
static constexpr uint32_t META = 1 << 0;
static constexpr uint32_t LOG_REPLICATED = 1 << 1;
static constexpr uint32_t LOG_LOCAL = 1 << 2;
static constexpr uint32_t DATA = 1 << 3;
static constexpr uint32_t INDEX = 1 << 4;
static constexpr uint32_t REPLICATION = 1 << 5;

uint32_t svcs;

HS_SERVICE() : svcs{META} {}

std::string list() const {
std::string str;
if (svcs & META) { str += "meta,"; }
if (svcs & DATA) { str += "data,"; }
if (svcs & INDEX) { str += "index,"; }
if (svcs & LOG_REPLICATED) { str += "log_replicated,"; }
if (svcs & LOG_LOCAL) { str += "log_local,"; }
if (svcs & REPLICATION) { str += "replication,"; }
return str;
}
};

VENUM(repl_impl_type, uint8_t,
server_side, // Completely homestore controlled replication
client_assisted, // Client assisting in replication
solo // For single node - no replication
);

/*
* IO errors handling by homestore.
Expand All @@ -82,6 +114,7 @@ class HomeStore {
std::unique_ptr< MetaBlkService > m_meta_service;
std::unique_ptr< LogStoreService > m_log_service;
std::unique_ptr< IndexService > m_index_service;
std::unique_ptr< ReplicationServiceImpl > m_repl_service;

std::unique_ptr< DeviceManager > m_dev_mgr;
shared< sisl::logging::logger_t > m_periodic_logger;
Expand All @@ -90,8 +123,7 @@ class HomeStore {
std::unique_ptr< CPManager > m_cp_mgr;
shared< sisl::Evictor > m_evictor;

bool m_vdev_failed{false};

HS_SERVICE m_services; // Services homestore is starting with
hs_before_services_starting_cb_t m_before_services_starting_cb{nullptr};

public:
Expand All @@ -109,11 +141,14 @@ class HomeStore {
static shared< spdlog::logger >& periodic_logger() { return instance()->m_periodic_logger; }

///////////////////////////// Member functions /////////////////////////////////////////////
bool start(const hs_input_params& input, hs_before_services_starting_cb_t svcs_starting_cb = nullptr,
std::unique_ptr< IndexServiceCallbacks > cbs = nullptr);
HomeStore& with_data_service(cshared< ChunkSelector >& custom_chunk_selector = nullptr);
HomeStore& with_log_service();
HomeStore& with_index_service(std::unique_ptr< IndexServiceCallbacks > cbs);
HomeStore& with_repl_data_service(repl_impl_type repl_type, std::unique_ptr< ReplServiceCallbacks > cbs,
cshared< ChunkSelector >& custom_chunk_selector = nullptr);

bool start(const hs_input_params& input, hs_before_services_starting_cb_t svcs_starting_cb = nullptr);
void format_and_start(std::map< uint32_t, hs_format_params >&& format_opts);

void shutdown();

// cap_attrs get_system_capacity() const; // Need to move this to homeblks/homeobj
Expand All @@ -124,11 +159,13 @@ class HomeStore {
bool has_data_service() const;
bool has_meta_service() const;
bool has_log_service() const;
bool has_repl_data_service() const;

BlkDataService& data_service() { return *m_data_service; }
MetaBlkService& meta_service() { return *m_meta_service; }
LogStoreService& logstore_service() { return *m_log_service; }
IndexService& index_service() { return *m_index_service; }
ReplicationServiceImpl& repl_service() { return *m_repl_service; }
DeviceManager* device_mgr() { return m_dev_mgr.get(); }
ResourceMgr& resource_mgr() { return *m_resource_mgr.get(); }
CPManager& cp_mgr() { return *m_cp_mgr.get(); }
Expand Down
34 changes: 1 addition & 33 deletions src/include/homestore/homestore_decl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ ENUM(blk_allocator_type_t, uint8_t, none, fixed, varsize, append);
ENUM(chunk_selector_type_t, uint8_t, // What are the options to select chunk to allocate a block
NONE, // Caller want nothing to be set
ROUND_ROBIN, // Pick round robin
HEAP, // Heap chunk selector
CUSTOM, // Controlled by the upper layer
RANDOM, // Pick any chunk in uniformly random fashion
MOST_AVAILABLE_SPACE, // Pick the most available space
Expand Down Expand Up @@ -143,40 +142,10 @@ static std::string in_bytes(uint64_t sz) {
return _format_decimals(size / arr.back().first, arr.back().second);
}

struct HS_SERVICE {
static constexpr uint32_t META = 1 << 0;
static constexpr uint32_t LOG_REPLICATED = 1 << 1;
static constexpr uint32_t LOG_LOCAL = 1 << 2;
static constexpr uint32_t DATA = 1 << 3;
static constexpr uint32_t INDEX = 1 << 4;
static constexpr uint32_t REPLICATION = 1 << 5;

uint32_t svcs;

HS_SERVICE() : svcs{META} {}
HS_SERVICE(uint32_t val) : svcs{val} {
svcs |= META; // Force meta to be present always
if (svcs & REPLICATION) {
svcs |= LOG_REPLICATED | LOG_LOCAL;
svcs &= ~DATA; // ReplicationDataSvc or DataSvc only one of them
}
}

std::string list() const {
std::string str;
if (svcs & META) { str += "meta,"; }
if (svcs & DATA) { str += "data,"; }
if (svcs & INDEX) { str += "index,"; }
if (svcs & LOG_REPLICATED) { str += "log_replicated,"; }
if (svcs & LOG_LOCAL) { str += "log_local,"; }
if (svcs & REPLICATION) { str += "replication,"; }
return str;
}
};

struct hs_format_params {
float size_pct;
uint32_t num_chunks{1};
uint32_t block_size{0};
blk_allocator_type_t alloc_type{blk_allocator_type_t::varsize};
chunk_selector_type_t chunk_sel_type{chunk_selector_type_t::ROUND_ROBIN};
};
Expand All @@ -192,7 +161,6 @@ struct hs_input_params {
uint64_t hugepage_size{0}; // memory available for the hugepage
bool is_read_only{false}; // Is read only
bool auto_recovery{true}; // Recovery of data is automatic or controlled by the caller
HS_SERVICE services; // Services homestore is starting with

#ifdef _PRERELEASE
bool force_reinit{false};
Expand Down
5 changes: 4 additions & 1 deletion src/include/homestore/index_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ class VirtualDev;
class IndexServiceCallbacks {
public:
virtual ~IndexServiceCallbacks() = default;
virtual std::shared_ptr< IndexTableBase > on_index_table_found(const superblk< index_table_sb >& cb) = 0;
virtual std::shared_ptr< IndexTableBase > on_index_table_found(superblk< index_table_sb > const&) {
assert(0);
return nullptr;
}
};

class IndexService {
Expand Down
8 changes: 4 additions & 4 deletions src/include/homestore/replication/repl_decls.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#include <folly/small_vector.h>
#include <sisl/logging/logging.h>
#include <iomgr/iomgr_types.hpp>
#include <homestore/homestore_decl.hpp>
#include <homestore/blk.h>
#include <sisl/fds/buffer.hpp>
Expand All @@ -18,9 +17,10 @@ using blkid_list_t = folly::small_vector< BlkId, 4 >;

// Fully qualified domain pba, unique pba id across replica set
struct RemoteBlkId {
RemoteBlkId() = default;
RemoteBlkId(uint32_t s, const BlkId& b) : server_id{s}, blkid{b} {}
uint32_t server_id;
BlkId blkid;
uint32_t server_id{0};
MultiBlkId blkid;

bool operator==(RemoteBlkId const& o) const { return (server_id == o.server_id) && (blkid == o.blkid); }
};
Expand All @@ -38,7 +38,7 @@ namespace std {
template <>
struct hash< homestore::RemoteBlkId > {
size_t operator()(homestore::RemoteBlkId const& fqbid) const noexcept {
return std::hash< uint64_t >()(fqbid.server_id) + std::hash< uint64_t >()(fqbid.blkid.to_integer());
return std::hash< uint64_t >()(fqbid.server_id) + std::hash< homestore::MultiBlkId >()(fqbid.blkid);
}
};
} // namespace std
31 changes: 18 additions & 13 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#pragma once

#include <nuraft_mesg/messaging_if.hpp>
// #include <nuraft_mesg/messaging_if.hpp>
#include <sisl/fds/buffer.hpp>

#include <homestore/replication/repl_decls.h>

namespace home_replication {

namespace homestore {
//
// Callbacks to be implemented by ReplDev users.
//
Expand All @@ -25,7 +24,7 @@ class ReplDevListener {
/// @param blkids - List of blkids where data is written to the storage engine.
/// @param ctx - User contenxt passed as part of the replica_set::write() api
///
virtual void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, blkid_list_t const& blkids,
virtual void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids,
void* ctx) = 0;

/// @brief Called when the log entry has been received by the replica dev.
Expand Down Expand Up @@ -71,14 +70,15 @@ class ReplDevListener {
///
/// @param header Header originally passed with repl_dev::write() api on the leader
/// @return Expected to return blk_alloc_hints for this write
virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header) = 0;
virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header, void* user_ctx) = 0;

/// @brief Called when the replica set is being stopped
virtual void on_replica_stop() = 0;
};

class ReplDev {
public:
ReplDev() = default;
virtual ~ReplDev() = default;

/// @brief Replicate the data to the replica set. This method goes through the
Expand All @@ -100,7 +100,7 @@ class ReplDev {
/// list size is 0, then only key is written to replicadev without data.
/// @param user_ctx - User supplied opaque context which will be passed to listener
/// callbacks
virtual void async_alloc_write(const sisl::blob& header, const sisl::blob& key, const sisl::sg_list& value,
virtual void async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value,
void* user_ctx) = 0;

/// @brief Reads the data and returns a future to continue on
Expand All @@ -109,24 +109,29 @@ class ReplDev {
/// @param size Total size of the data read
/// @param part_of_batch Is read is part of a batch. If part of the batch, then submit_batch needs to be called at
/// the end
/// @return A Future with bool to notify if it has successfully read the data, raises the exception in case of
/// failure
virtual folly::Future< bool > async_read(const BlkId& bid, sisl::sg_list& sgs, uint32_t size,
bool part_of_batch = false);
/// @return A Future with std::error_code to notify if it has successfully read the data or any error code in case
/// of failure
virtual folly::Future< std::error_code > async_read(MultiBlkId const& blkid, sisl::sg_list& sgs, uint32_t size,
bool part_of_batch = false) = 0;

/// @brief After data is replicated and on_commit to the listener is called. the blkids can be freed.
///
/// @param lsn - LSN of the old blkids that is being freed
/// @param blkids - blkids to be freed.
virtual void async_free_blks(int64_t lsn, const blkid_list_t& blkids) = 0;
virtual void async_free_blks(int64_t lsn, MultiBlkId const& blkid) = 0;

/// @brief Checks if this replica is the leader in this ReplDev
/// @return true or false
virtual bool is_leader() const = 0;

/// @brief Gets the group_id this repldev is working for
/// @return group_id
virtual std::string group_id() const = 0;
virtual uuid_t group_id() const = 0;

virtual void attach_listener(std::unique_ptr< ReplDevListener > listener) { m_listener = std::move(listener); }

protected:
std::unique_ptr< ReplDevListener > m_listener;
};

} // namespace home_replication
} // namespace homestore
Loading

0 comments on commit 319a3d5

Please sign in to comment.