Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SoloReplDev - a pass through replication device #177

Merged
merged 11 commits into from
Sep 21, 2023
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "4.3.2"
version = "4.4.0"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ add_subdirectory(lib/logstore)
add_subdirectory(lib/meta)
add_subdirectory(lib/index)
add_subdirectory(lib/blkdata_svc/)
add_subdirectory(lib/replication/)

add_subdirectory(tests)
set(HOMESTORE_OBJECTS
Expand All @@ -46,6 +47,7 @@ set(HOMESTORE_OBJECTS
$<TARGET_OBJECTS:hs_checkpoint>
$<TARGET_OBJECTS:hs_index>
$<TARGET_OBJECTS:hs_datasvc>
$<TARGET_OBJECTS:hs_replication>
lib/homestore.cpp
#$<TARGET_OBJECTS:hs_cp>
#$<TARGET_OBJECTS:indx_mgr>
Expand Down
15 changes: 6 additions & 9 deletions src/include/homestore/blkdata_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,20 @@ struct vdev_info;
struct stream_info_t;
class BlkReadTracker;
struct blk_alloc_hints;

using blk_t = uint64_t;
using blk_list_t = folly::small_vector< blk_t, 4 >;
class ChunkSelector;

class BlkDataService {
public:
BlkDataService();
BlkDataService(shared< ChunkSelector > custom_chunk_selector);
~BlkDataService();

/**
* @brief : called in non-recovery mode to create a new vdev for data service
*
* @param size : size of this vdev
*/
void create_vdev(uint64_t size, homestore::blk_allocator_type_t alloc_type,
homestore::chunk_selector_type_t chunk_sel_type);
void create_vdev(uint64_t size, uint32_t blk_size, blk_allocator_type_t alloc_type,
chunk_selector_type_t chunk_sel_type);

/**
* @brief : called during recovery to open existing vdev for data service
Expand Down Expand Up @@ -113,7 +111,7 @@ class BlkDataService {
*
* @return : the block list that have the blocks;
*/
blk_list_t alloc_blks(uint32_t size);
BlkAllocStatus alloc_blks(uint32_t size, blk_alloc_hints const& hints, MultiBlkId& out_blkids);
hkadayam marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief : asynchronous free block, it is asynchronous because it might need to wait for pending read to complete
Expand Down Expand Up @@ -144,15 +142,14 @@ class BlkDataService {
void start();

private:
BlkAllocStatus alloc_blks(uint32_t size, blk_alloc_hints const& hints, MultiBlkId& out_blkids);

void init();

static void process_data_completion(std::error_condition ec, void* cookie);

private:
std::shared_ptr< VirtualDev > m_vdev;
std::unique_ptr< BlkReadTracker > m_blk_read_tracker;
std::shared_ptr< ChunkSelector > m_custom_chunk_selector;
uint32_t m_blk_size;
};

Expand Down
8 changes: 5 additions & 3 deletions src/include/homestore/checkpoint/cp_mgr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@ class CPCallbacks {
virtual std::unique_ptr< CPContext > on_switchover_cp(CP* cur_cp, CP* new_cp) = 0;

/// @brief After gathering CPContext from all consumers, CPManager calls this method to flush the dirty buffers
/// accumulated in this CP. Once CP flush is completed, consumers are required to call the flush_done callback.
/// accumulated in this CP. Once CP flush is completed, consumers are required to set the promise corresponding to
/// returned future.
/// @param cp CP pointer to which the dirty buffers have to be flushed
/// @param done_cb Callback after cp is done
virtual folly::Future< bool > cp_flush(CP* cp) = 0;

/// @brief After flushed the CP, CPManager calls this method to clean up any CP related structures
/// @brief After all consumers flushed the CP, CPManager calls this method to clean up any CP related structures
/// @param cp
virtual void cp_cleanup(CP* cp) = 0;

Expand Down Expand Up @@ -164,9 +165,10 @@ class CPManager {
std::vector< iomgr::io_fiber_t > m_cp_io_fibers;

public:
CPManager(bool first_time_boot);
CPManager();
virtual ~CPManager();

void start(bool first_time_boot);
/// @brief Shutdown the checkpoint manager services. It will not trigger a flush, but cancels any existing
/// checkpoint session abruptly. If caller needs clean shutdown, then they explicitly needs to trigger cp flush
/// before calling shutdown.
Expand Down
51 changes: 44 additions & 7 deletions src/include/homestore/homestore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@ class MetaBlkService;
class LogStoreService;
class BlkDataService;
class IndexService;
class ReplicationServiceImpl;
class IndexServiceCallbacks;
class ReplServiceCallbacks;
struct vdev_info;
class HomeStore;
class CPManager;
class VirtualDev;
class ChunkSelector;

using HomeStoreSafePtr = std::shared_ptr< HomeStore >;

Expand All @@ -63,8 +66,37 @@ struct hs_vdev_context {
};
#pragma pack()

typedef std::function< void(void) > hs_before_services_starting_cb_t;
typedef std::function< void(bool success) > hs_comp_callback;
using hs_before_services_starting_cb_t = std::function< void(void) >;

struct HS_SERVICE {
static constexpr uint32_t META = 1 << 0;
static constexpr uint32_t LOG_REPLICATED = 1 << 1;
static constexpr uint32_t LOG_LOCAL = 1 << 2;
static constexpr uint32_t DATA = 1 << 3;
static constexpr uint32_t INDEX = 1 << 4;
static constexpr uint32_t REPLICATION = 1 << 5;

uint32_t svcs;

HS_SERVICE() : svcs{META} {}

std::string list() const {
std::string str;
if (svcs & META) { str += "meta,"; }
if (svcs & DATA) { str += "data,"; }
if (svcs & INDEX) { str += "index,"; }
if (svcs & LOG_REPLICATED) { str += "log_replicated,"; }
if (svcs & LOG_LOCAL) { str += "log_local,"; }
if (svcs & REPLICATION) { str += "replication,"; }
return str;
}
};

VENUM(repl_impl_type, uint8_t,
server_side, // Completely homestore controlled replication
client_assisted, // Client assisting in replication
solo // For single node - no replication
);

/*
* IO errors handling by homestore.
Expand All @@ -82,6 +114,7 @@ class HomeStore {
std::unique_ptr< MetaBlkService > m_meta_service;
std::unique_ptr< LogStoreService > m_log_service;
std::unique_ptr< IndexService > m_index_service;
std::unique_ptr< ReplicationServiceImpl > m_repl_service;

std::unique_ptr< DeviceManager > m_dev_mgr;
shared< sisl::logging::logger_t > m_periodic_logger;
Expand All @@ -90,8 +123,7 @@ class HomeStore {
std::unique_ptr< CPManager > m_cp_mgr;
shared< sisl::Evictor > m_evictor;

bool m_vdev_failed{false};

HS_SERVICE m_services; // Services homestore is starting with
hs_before_services_starting_cb_t m_before_services_starting_cb{nullptr};

public:
Expand All @@ -109,11 +141,14 @@ class HomeStore {
static shared< spdlog::logger >& periodic_logger() { return instance()->m_periodic_logger; }

///////////////////////////// Member functions /////////////////////////////////////////////
bool start(const hs_input_params& input, hs_before_services_starting_cb_t svcs_starting_cb = nullptr,
std::unique_ptr< IndexServiceCallbacks > cbs = nullptr);
HomeStore& with_data_service(cshared< ChunkSelector >& custom_chunk_selector = nullptr);
HomeStore& with_log_service();
HomeStore& with_index_service(std::unique_ptr< IndexServiceCallbacks > cbs);
HomeStore& with_repl_data_service(repl_impl_type repl_type, std::unique_ptr< ReplServiceCallbacks > cbs,
cshared< ChunkSelector >& custom_chunk_selector = nullptr);

bool start(const hs_input_params& input, hs_before_services_starting_cb_t svcs_starting_cb = nullptr);
void format_and_start(std::map< uint32_t, hs_format_params >&& format_opts);

void shutdown();

// cap_attrs get_system_capacity() const; // Need to move this to homeblks/homeobj
Expand All @@ -124,11 +159,13 @@ class HomeStore {
bool has_data_service() const;
bool has_meta_service() const;
bool has_log_service() const;
bool has_repl_data_service() const;

BlkDataService& data_service() { return *m_data_service; }
MetaBlkService& meta_service() { return *m_meta_service; }
LogStoreService& logstore_service() { return *m_log_service; }
IndexService& index_service() { return *m_index_service; }
ReplicationServiceImpl& repl_service() { return *m_repl_service; }
DeviceManager* device_mgr() { return m_dev_mgr.get(); }
ResourceMgr& resource_mgr() { return *m_resource_mgr.get(); }
CPManager& cp_mgr() { return *m_cp_mgr.get(); }
Expand Down
34 changes: 1 addition & 33 deletions src/include/homestore/homestore_decl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ ENUM(blk_allocator_type_t, uint8_t, none, fixed, varsize, append);
ENUM(chunk_selector_type_t, uint8_t, // What are the options to select chunk to allocate a block
NONE, // Caller want nothing to be set
ROUND_ROBIN, // Pick round robin
HEAP, // Heap chunk selector
CUSTOM, // Controlled by the upper layer
RANDOM, // Pick any chunk in uniformly random fashion
MOST_AVAILABLE_SPACE, // Pick the most available space
Expand Down Expand Up @@ -143,40 +142,10 @@ static std::string in_bytes(uint64_t sz) {
return _format_decimals(size / arr.back().first, arr.back().second);
}

struct HS_SERVICE {
static constexpr uint32_t META = 1 << 0;
static constexpr uint32_t LOG_REPLICATED = 1 << 1;
static constexpr uint32_t LOG_LOCAL = 1 << 2;
static constexpr uint32_t DATA = 1 << 3;
static constexpr uint32_t INDEX = 1 << 4;
static constexpr uint32_t REPLICATION = 1 << 5;

uint32_t svcs;

HS_SERVICE() : svcs{META} {}
HS_SERVICE(uint32_t val) : svcs{val} {
svcs |= META; // Force meta to be present always
if (svcs & REPLICATION) {
svcs |= LOG_REPLICATED | LOG_LOCAL;
svcs &= ~DATA; // ReplicationDataSvc or DataSvc only one of them
}
}

std::string list() const {
std::string str;
if (svcs & META) { str += "meta,"; }
if (svcs & DATA) { str += "data,"; }
if (svcs & INDEX) { str += "index,"; }
if (svcs & LOG_REPLICATED) { str += "log_replicated,"; }
if (svcs & LOG_LOCAL) { str += "log_local,"; }
if (svcs & REPLICATION) { str += "replication,"; }
return str;
}
};

struct hs_format_params {
float size_pct;
uint32_t num_chunks{1};
uint32_t block_size{0};
blk_allocator_type_t alloc_type{blk_allocator_type_t::varsize};
chunk_selector_type_t chunk_sel_type{chunk_selector_type_t::ROUND_ROBIN};
};
Expand All @@ -192,7 +161,6 @@ struct hs_input_params {
uint64_t hugepage_size{0}; // memory available for the hugepage
bool is_read_only{false}; // Is read only
bool auto_recovery{true}; // Recovery of data is automatic or controlled by the caller
HS_SERVICE services; // Services homestore is starting with

#ifdef _PRERELEASE
bool force_reinit{false};
Expand Down
5 changes: 4 additions & 1 deletion src/include/homestore/index_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ class VirtualDev;
class IndexServiceCallbacks {
public:
virtual ~IndexServiceCallbacks() = default;
virtual std::shared_ptr< IndexTableBase > on_index_table_found(const superblk< index_table_sb >& cb) = 0;
virtual std::shared_ptr< IndexTableBase > on_index_table_found(superblk< index_table_sb > const&) {
assert(0);
szmyd marked this conversation as resolved.
Show resolved Hide resolved
return nullptr;
}
};

class IndexService {
Expand Down
8 changes: 4 additions & 4 deletions src/include/homestore/replication/repl_decls.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#include <folly/small_vector.h>
#include <sisl/logging/logging.h>
#include <iomgr/iomgr_types.hpp>
#include <homestore/homestore_decl.hpp>
#include <homestore/blk.h>
#include <sisl/fds/buffer.hpp>
Expand All @@ -18,9 +17,10 @@ using blkid_list_t = folly::small_vector< BlkId, 4 >;

// Fully qualified domain pba, unique pba id across replica set
struct RemoteBlkId {
RemoteBlkId() = default;
RemoteBlkId(uint32_t s, const BlkId& b) : server_id{s}, blkid{b} {}
uint32_t server_id;
BlkId blkid;
uint32_t server_id{0};
MultiBlkId blkid;

bool operator==(RemoteBlkId const& o) const { return (server_id == o.server_id) && (blkid == o.blkid); }
};
Expand All @@ -38,7 +38,7 @@ namespace std {
template <>
struct hash< homestore::RemoteBlkId > {
size_t operator()(homestore::RemoteBlkId const& fqbid) const noexcept {
return std::hash< uint64_t >()(fqbid.server_id) + std::hash< uint64_t >()(fqbid.blkid.to_integer());
return std::hash< uint64_t >()(fqbid.server_id) + std::hash< homestore::MultiBlkId >()(fqbid.blkid);
}
};
} // namespace std
31 changes: 18 additions & 13 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#pragma once

#include <nuraft_mesg/messaging_if.hpp>
// #include <nuraft_mesg/messaging_if.hpp>
#include <sisl/fds/buffer.hpp>

#include <homestore/replication/repl_decls.h>

namespace home_replication {

namespace homestore {
//
// Callbacks to be implemented by ReplDev users.
//
Expand All @@ -25,7 +24,7 @@ class ReplDevListener {
/// @param blkids - List of blkids where data is written to the storage engine.
/// @param ctx - User contenxt passed as part of the replica_set::write() api
///
virtual void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, blkid_list_t const& blkids,
virtual void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids,
void* ctx) = 0;

/// @brief Called when the log entry has been received by the replica dev.
Expand Down Expand Up @@ -71,14 +70,15 @@ class ReplDevListener {
///
/// @param header Header originally passed with repl_dev::write() api on the leader
/// @return Expected to return blk_alloc_hints for this write
virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header) = 0;
virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header, void* user_ctx) = 0;

/// @brief Called when the replica set is being stopped
virtual void on_replica_stop() = 0;
};

class ReplDev {
public:
ReplDev() = default;
virtual ~ReplDev() = default;

/// @brief Replicate the data to the replica set. This method goes through the
Expand All @@ -100,7 +100,7 @@ class ReplDev {
/// list size is 0, then only key is written to replicadev without data.
/// @param user_ctx - User supplied opaque context which will be passed to listener
/// callbacks
virtual void async_alloc_write(const sisl::blob& header, const sisl::blob& key, const sisl::sg_list& value,
virtual void async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value,
void* user_ctx) = 0;

/// @brief Reads the data and returns a future to continue on
Expand All @@ -109,24 +109,29 @@ class ReplDev {
/// @param size Total size of the data read
/// @param part_of_batch Is read is part of a batch. If part of the batch, then submit_batch needs to be called at
/// the end
/// @return A Future with bool to notify if it has successfully read the data, raises the exception in case of
/// failure
virtual folly::Future< bool > async_read(const BlkId& bid, sisl::sg_list& sgs, uint32_t size,
bool part_of_batch = false);
/// @return A Future with std::error_code to notify if it has successfully read the data or any error code in case
/// of failure
virtual folly::Future< std::error_code > async_read(MultiBlkId const& blkid, sisl::sg_list& sgs, uint32_t size,
bool part_of_batch = false) = 0;

/// @brief After data is replicated and on_commit to the listener is called. the blkids can be freed.
///
/// @param lsn - LSN of the old blkids that is being freed
/// @param blkids - blkids to be freed.
virtual void async_free_blks(int64_t lsn, const blkid_list_t& blkids) = 0;
virtual void async_free_blks(int64_t lsn, MultiBlkId const& blkid) = 0;

/// @brief Checks if this replica is the leader in this ReplDev
/// @return true or false
virtual bool is_leader() const = 0;

/// @brief Gets the group_id this repldev is working for
/// @return group_id
virtual std::string group_id() const = 0;
virtual uuid_t group_id() const = 0;

virtual void attach_listener(std::unique_ptr< ReplDevListener > listener) { m_listener = std::move(listener); }

protected:
std::unique_ptr< ReplDevListener > m_listener;
};

} // namespace home_replication
} // namespace homestore
Loading