Skip to content

Commit

Permalink
Merge branch 'master' into yk_cp_trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
yamingk authored Sep 27, 2023
2 parents 1c2774e + db23fc8 commit b9a77f2
Show file tree
Hide file tree
Showing 18 changed files with 360 additions and 235 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "4.4.1"
version = "4.5.1"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/btree/btree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class Btree {
// to overcome the gcc bug, pointer here: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=66944
static BtreeThreadVariables* bt_thread_vars() {
auto this_id(boost::this_fiber::get_id());
static thread_local std::map< fibers::fiber::id, std::unique_ptr< BtreeThreadVariables > > fiber_map;
static thread_local std::map< boost::fibers::fiber::id, std::unique_ptr< BtreeThreadVariables > > fiber_map;
if (fiber_map.count(this_id)) { return fiber_map[this_id].get(); }
fiber_map[this_id] = std::make_unique< BtreeThreadVariables >();
return fiber_map[this_id].get();
Expand Down
3 changes: 3 additions & 0 deletions src/include/homestore/btree/detail/simple_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ class SimpleNode : public BtreeNode {
}

std::string to_string_keys(bool print_friendly = false) const override {
#if 0
std::string delimiter = print_friendly ? "\n" : "\t";
auto str = fmt::format("{}{} nEntries={} {} ",
print_friendly ? "------------------------------------------------------------\n" : "",
Expand Down Expand Up @@ -314,6 +315,8 @@ class SimpleNode : public BtreeNode {
fmt::format_to(std::back_inserter(str), "-{}]", cur_key);
}
return str;
#endif
return {};
}

uint8_t* get_node_context() override { return uintptr_cast(this) + sizeof(SimpleNode< K, V >); }
Expand Down
3 changes: 3 additions & 0 deletions src/include/homestore/btree/detail/varlen_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ class VariableNode : public BtreeNode {
}

std::string to_string_keys(bool print_friendly = false) const override {
#if 0
std::string delimiter = print_friendly ? "\n" : "\t";
auto str = fmt::format("{}{} nEntries={} {} ",
print_friendly ? "------------------------------------------------------------\n" : "",
Expand Down Expand Up @@ -570,6 +571,8 @@ class VariableNode : public BtreeNode {
fmt::format_to(std::back_inserter(str), "-{}]", cur_key);
}
return str;
#endif
return {};
}

uint8_t* get_node_context() override { return uintptr_cast(this) + sizeof(VariableNode< K, V >); }
Expand Down
9 changes: 4 additions & 5 deletions src/include/homestore/homestore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ class MetaBlkService;
class LogStoreService;
class BlkDataService;
class IndexService;
class ReplicationServiceImpl;
class ReplicationService;
class IndexServiceCallbacks;
class ReplServiceCallbacks;
struct vdev_info;
class HomeStore;
class CPManager;
Expand Down Expand Up @@ -114,7 +113,7 @@ class HomeStore {
std::unique_ptr< MetaBlkService > m_meta_service;
std::unique_ptr< LogStoreService > m_log_service;
std::unique_ptr< IndexService > m_index_service;
std::unique_ptr< ReplicationServiceImpl > m_repl_service;
std::unique_ptr< ReplicationService > m_repl_service;

std::unique_ptr< DeviceManager > m_dev_mgr;
shared< sisl::logging::logger_t > m_periodic_logger;
Expand Down Expand Up @@ -144,7 +143,7 @@ class HomeStore {
HomeStore& with_data_service(cshared< ChunkSelector >& custom_chunk_selector = nullptr);
HomeStore& with_log_service();
HomeStore& with_index_service(std::unique_ptr< IndexServiceCallbacks > cbs);
HomeStore& with_repl_data_service(repl_impl_type repl_type, std::unique_ptr< ReplServiceCallbacks > cbs,
HomeStore& with_repl_data_service(repl_impl_type repl_type,
cshared< ChunkSelector >& custom_chunk_selector = nullptr);

bool start(const hs_input_params& input, hs_before_services_starting_cb_t svcs_starting_cb = nullptr);
Expand All @@ -165,7 +164,7 @@ class HomeStore {
MetaBlkService& meta_service() { return *m_meta_service; }
LogStoreService& logstore_service() { return *m_log_service; }
IndexService& index_service() { return *m_index_service; }
ReplicationServiceImpl& repl_service() { return *m_repl_service; }
ReplicationService& repl_service() { return *m_repl_service; }
DeviceManager* device_mgr() { return m_dev_mgr.get(); }
ResourceMgr& resource_mgr() { return *m_resource_mgr.get(); }
CPManager& cp_mgr() { return *m_cp_mgr.get(); }
Expand Down
3 changes: 3 additions & 0 deletions src/include/homestore/homestore_decl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ using unique = std::unique_ptr< T >;
template < typename T >
using intrusive = boost::intrusive_ptr< T >;

template < typename T >
using cintrusive = const boost::intrusive_ptr< T >;

////////////// All Size Limits ///////////////////
constexpr uint32_t BLK_NUM_BITS{32};
constexpr uint32_t NBLKS_BITS{8};
Expand Down
2 changes: 2 additions & 0 deletions src/include/homestore/index/wb_cache_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ struct CPContext;

class IndexWBCacheBase {
public:
virtual ~IndexWBCacheBase() = default;

/// @brief Allocate the buffer and initialize the btree node. It adds the node to the wb cache.
/// @tparam K Key type of the Index
/// @param node_initializer Callback to be called upon which buffer is turned into btree node
Expand Down
57 changes: 46 additions & 11 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
@@ -1,18 +1,45 @@
#pragma once

// #include <nuraft_mesg/messaging_if.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include <sisl/fds/buffer.hpp>

#include <homestore/replication/repl_decls.h>

namespace homestore {
class ReplDev;

struct repl_journal_entry;
struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::thread_safe_counter > {
friend class SoloReplDev;

public:
virtual ~repl_req_ctx();
int64_t get_lsn() const { return lsn; }

private:
sisl::blob header; // User header
sisl::blob key; // Key to replicate
sisl::sg_list value; // Raw value - applicable only to leader req
MultiBlkId local_blkid; // List of corresponding local blkids for the value
RemoteBlkId remote_blkid; // List of remote blkid for the value
std::unique_ptr< uint8_t[] > journal_buf; // Buf for the journal entry
repl_journal_entry* journal_entry{nullptr}; // pointer to the journal entry
int64_t lsn{0}; // Lsn for this replication req

void alloc_journal_entry(uint32_t size);
};

//
// Callbacks to be implemented by ReplDev users.
//
class ReplDevListener {
public:
virtual ~ReplDevListener() = default;

void set_repl_dev(ReplDev* rdev) { m_repl_dev = std::move(rdev); }
virtual ReplDev* repl_dev() { return m_repl_dev; }

/// @brief Called when the log entry has been committed in the replica set.
///
/// This function is called from a dedicated commit thread which is different from the original thread calling
Expand All @@ -22,10 +49,10 @@ class ReplDevListener {
/// @param header - Header originally passed with replica_set::write() api
/// @param key - Key originally passed with replica_set::write() api
/// @param blkids - List of blkids where data is written to the storage engine.
/// @param ctx - User contenxt passed as part of the replica_set::write() api
/// @param ctx - Context passed as part of the replica_set::write() api
///
virtual void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids,
void* ctx) = 0;
cintrusive< repl_req_ctx >& ctx) = 0;

/// @brief Called when the log entry has been received by the replica dev.
///
Expand All @@ -44,8 +71,9 @@ class ReplDevListener {
/// @param lsn - The log sequence number
/// @param header - Header originally passed with repl_dev::write() api
/// @param key - Key originally passed with repl_dev::write() api
/// @param ctx - User contenxt passed as part of the repl_dev::write() api
virtual void on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx) = 0;
/// @param ctx - Context passed as part of the replica_set::write() api
virtual bool on_pre_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
cintrusive< repl_req_ctx >& ctx) = 0;

/// @brief Called when the log entry has been rolled back by the replica set.
///
Expand All @@ -59,21 +87,26 @@ class ReplDevListener {
/// @param lsn - The log sequence number getting rolled back
/// @param header - Header originally passed with repl_dev::write() api
/// @param key - Key originally passed with repl_dev::write() api
/// @param ctx - User contenxt passed as part of the repl_dev::write() api
virtual void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, void* ctx) = 0;
/// @param ctx - Context passed as part of the replica_set::write() api
virtual void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
cintrusive< repl_req_ctx >& ctx) = 0;

/// @brief Called when replication module is trying to allocate a block to write the value
///
/// This function can be called both on leader and follower when it is trying to allocate a block to write the
/// value. Caller is expected to provide hints for allocation based on the header supplied as part of original
/// write. In cases where caller don't care about the hints can return default blk_alloc_hints.
///
/// @param header Header originally passed with repl_dev::write() api on the leader
/// @param header Header originally passed with repl_dev::async_alloc_write() api on the leader
/// @param Original context passed as part of repl_dev::async_alloc_write
/// @return Expected to return blk_alloc_hints for this write
virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header, void* user_ctx) = 0;
virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header, cintrusive< repl_req_ctx >& ctx) = 0;

/// @brief Called when the replica set is being stopped
virtual void on_replica_stop() = 0;

private:
ReplDev* m_repl_dev;
};

class ReplDev {
Expand All @@ -98,10 +131,10 @@ class ReplDev {
/// cases
/// @param value - vector of io buffers that contain value for the key. It is an optional field and if the value
/// list size is 0, then only key is written to replicadev without data.
/// @param user_ctx - User supplied opaque context which will be passed to listener
/// @param ctx - User supplied context which will be passed to listener
/// callbacks
virtual void async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value,
void* user_ctx) = 0;
intrusive< repl_req_ctx > ctx) = 0;

/// @brief Reads the data and returns a future to continue on
/// @param bid Block id to read
Expand Down Expand Up @@ -130,6 +163,8 @@ class ReplDev {

virtual void attach_listener(std::unique_ptr< ReplDevListener > listener) { m_listener = std::move(listener); }

virtual uint32_t get_blk_size() const = 0;

protected:
std::unique_ptr< ReplDevListener > m_listener;
};
Expand Down
49 changes: 34 additions & 15 deletions src/include/homestore/replication_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,34 +36,53 @@ template < typename V, typename E >
using Result = folly::Expected< V, E >;

template < class V, class E >
using AsyncResult = folly::SemiFuture< Result< V, E > >;
using AsyncResult = folly::Future< Result< V, E > >;

template < class V >
using ReplResult = Result< V, ReplServiceError >;

template < class V >
using AsyncReplResult = AsyncResult< V, ReplServiceError >;

class ReplServiceCallbacks {
public:
virtual ~ReplServiceCallbacks() = default;
virtual std::unique_ptr< ReplDevListener > on_repl_dev_init(cshared< ReplDev >& rs) = 0;
};

class ReplicationService {
public:
ReplicationService() = default;
virtual ~ReplicationService() = default;

/// Sync APIs
virtual ReplResult< shared< ReplDev > > get_replica_dev(uuid_t group_id) const = 0;
virtual void iterate_replica_devs(std::function< void(cshared< ReplDev >&) > const& cb) = 0;
/// @brief Creates the Repl Device to which eventually user can read locally and write to the quorom of the members
/// @param group_id Unique ID indicating the group. This is the key for several lookup structures
/// @param members List of members to form this group
/// @param listener state machine listener of all the events happening on the repl_dev (commit, precommit etc)
/// @return A Future ReplDev on success or Future ReplServiceError upon error
virtual AsyncReplResult< shared< ReplDev > > create_repl_dev(uuid_t group_id,
std::set< std::string, std::less<> >&& members,
std::unique_ptr< ReplDevListener > listener) = 0;

/// @brief Opens the Repl Device for a given group id. It is expected that the repl dev is already created and used
/// this method for recovering. It is possible that repl_dev is not ready and in that case it will provide Repl
/// Device after it is ready and thus returns a Future.
///
/// NOTE 1: If callers does an open for a repl device which was not created before, then at the end of
/// initialization an error is returned saying ReplServiceError::SERVER_NOT_FOUND
///
/// NOTE 2: If the open repl device is called after Replication service is started, then it returns an error
/// ReplServiceError::BAD_REQUEST
/// @param group_id Group id to open the repl device with
/// @param listener state machine listener of all the events happening on the repl_dev (commit, precommit etc)
/// @return A Future ReplDev on successful open of ReplDev or Future ReplServiceError upon error
virtual AsyncReplResult< shared< ReplDev > > open_repl_dev(uuid_t group_id,
std::unique_ptr< ReplDevListener > listener) = 0;

virtual folly::Future< ReplServiceError > replace_member(uuid_t group_id, std::string const& member_out,
std::string const& member_in) const = 0;

/// Async APIs
virtual AsyncReplResult< shared< ReplDev > > create_replica_dev(uuid_t group_id,
std::set< std::string, std::less<> >&& members) = 0;
/// @brief Get the repl dev for a given group id if it is already created or opened
/// @param group_id Group id interested in
/// @return ReplDev is opened or ReplServiceError::SERVER_NOT_FOUND if it doesn't exist
virtual ReplResult< shared< ReplDev > > get_repl_dev(uuid_t group_id) const = 0;

virtual folly::SemiFuture< ReplServiceError > replace_member(uuid_t group_id, std::string const& member_out,
std::string const& member_in) const = 0;
/// @brief Iterate over all repl devs and then call the callback provided
/// @param cb Callback with repl dev
virtual void iterate_repl_devs(std::function< void(cshared< ReplDev >&) > const& cb) = 0;
};
} // namespace homestore
8 changes: 8 additions & 0 deletions src/lib/blkalloc/blk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ void MultiBlkId::deserialize(sisl::blob const& b, bool copy) {
}
}

#if 0
static uint32_t MultiBlkId::expected_serialized_size(uint16_t num_pieces) {
uint32_t sz = BlkId::expected_serialized_size();
if (num_pieces > 1) { sz += sizeof(uint16_t) + ((num_pieces - 1) * sizeof(chain_blkid)); }
return sz;
}
#endif

uint16_t MultiBlkId::num_pieces() const { return BlkId::is_valid() ? n_addln_piece + 1 : 0; }

bool MultiBlkId::has_room() const { return (n_addln_piece < max_addln_pieces); }
Expand Down
Loading

0 comments on commit b9a77f2

Please sign in to comment.