Skip to content

Commit

Permalink
Initial setup to get the raft based replication in on subsequent PRs (e…
Browse files Browse the repository at this point in the history
  • Loading branch information
hkadayam authored Nov 13, 2023
1 parent 49a3e94 commit edf0299
Show file tree
Hide file tree
Showing 37 changed files with 665 additions and 422 deletions.
3 changes: 2 additions & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def build_requirements(self):

def requirements(self):
self.requires("iomgr/[~=10, include_prerelease=True]@oss/master")
self.requires("sisl/[~=10, include_prerelease=True]@oss/master")
self.requires("sisl/[~=11, include_prerelease=True]@oss/master")
self.requires("nuraft_mesg/[~=2, include_prerelease=True]@oss/main")

self.requires("farmhash/cci.20190513@")
self.requires("isa-l/2.30.0")
Expand Down
1 change: 1 addition & 0 deletions src/include/homestore/blk.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ struct MultiBlkId : public BlkId {
BlkId to_single_blkid() const;

static uint32_t expected_serialized_size(uint16_t num_pieces);
static uint32_t max_serialized_size();
static int compare(MultiBlkId const& one, MultiBlkId const& two);

struct iterator {
Expand Down
8 changes: 4 additions & 4 deletions src/include/homestore/btree/btree_kv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,17 +256,17 @@ class BtreeLinkInfo : public BtreeValue {

sisl::blob serialize() const override {
sisl::blob b;
b.size = sizeof(bnode_link_info);
b.bytes = uintptr_cast(const_cast< bnode_link_info* >(&info));
b.set_size(sizeof(bnode_link_info));
b.set_bytes(r_cast< const uint8_t* >(&info));
return b;
}
uint32_t serialized_size() const override { return sizeof(bnode_link_info); }
static uint32_t get_fixed_size() { return sizeof(bnode_link_info); }
std::string to_string() const override { return fmt::format("{}.{}", info.m_bnodeid, info.m_link_version); }

void deserialize(const sisl::blob& b, bool copy) override {
DEBUG_ASSERT_EQ(b.size, sizeof(bnode_link_info), "BtreeLinkInfo deserialize received invalid blob");
auto other = r_cast< bnode_link_info* >(b.bytes);
DEBUG_ASSERT_EQ(b.size(), sizeof(bnode_link_info), "BtreeLinkInfo deserialize received invalid blob");
auto other = r_cast< bnode_link_info const* >(b.cbytes());
set_bnode_id(other->m_bnodeid);
set_link_version(other->m_link_version);
}
Expand Down
6 changes: 3 additions & 3 deletions src/include/homestore/btree/detail/btree_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,9 @@ class BtreeNode : public sisl::ObjLifeCounter< BtreeNode > {
virtual BtreeLinkInfo get_edge_value() const { return BtreeLinkInfo{edge_id(), edge_link_version()}; }

virtual void set_edge_value(const BtreeValue& v) {
const auto b = v.serialize();
auto l = r_cast< BtreeLinkInfo::bnode_link_info* >(b.bytes);
DEBUG_ASSERT_EQ(b.size, sizeof(BtreeLinkInfo::bnode_link_info));
auto const b = v.serialize();
auto const l = r_cast< BtreeLinkInfo::bnode_link_info const* >(b.cbytes());
DEBUG_ASSERT_EQ(b.size(), sizeof(BtreeLinkInfo::bnode_link_info));
set_edge_info(*l);
}

Expand Down
29 changes: 14 additions & 15 deletions src/include/homestore/btree/detail/prefix_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,20 @@ class FixedPrefixNode : public VariantNode< K, V > {
sisl::blob const kblob = s_cast< K const& >(key).serialize_prefix();
sisl::blob const vblob = s_cast< V const& >(val).serialize_prefix();

DEBUG_ASSERT_EQ(kblob.size, key_size(), "Prefix key size mismatch with serialized prefix size");
DEBUG_ASSERT_EQ(vblob.size, value_size(), "Prefix value size mismatch with serialized prefix size");
DEBUG_ASSERT_EQ(kblob.size(), key_size(), "Prefix key size mismatch with serialized prefix size");
DEBUG_ASSERT_EQ(vblob.size(), value_size(), "Prefix value size mismatch with serialized prefix size");

uint8_t* cur_ptr = uintptr_cast(this) + sizeof(prefix_entry);
std::memcpy(cur_ptr, kblob.bytes, kblob.size);
cur_ptr += kblob.size;
std::memcpy(cur_ptr, vblob.bytes, vblob.size);
std::memcpy(cur_ptr, kblob.cbytes(), kblob.size());
cur_ptr += kblob.size();
std::memcpy(cur_ptr, vblob.cbytes(), vblob.size());
}
}

sisl::blob key_buf() const {
return sisl::blob{const_cast< uint8_t* >(r_cast< uint8_t const* >(this) + sizeof(prefix_entry)),
key_size()};
return sisl::blob{r_cast< uint8_t const* >(this) + sizeof(prefix_entry), key_size()};
}
sisl::blob val_buf() const { return sisl::blob{key_buf().bytes + key_buf().size, value_size()}; }
sisl::blob val_buf() const { return sisl::blob{key_buf().cbytes() + key_buf().size(), value_size()}; }
};

struct suffix_entry {
Expand Down Expand Up @@ -131,19 +130,19 @@ class FixedPrefixNode : public VariantNode< K, V > {
kblob = key.serialize();
vblob = val.serialize();
}
DEBUG_ASSERT_EQ(kblob.size, key_size(), "Suffix key size mismatch with serialized suffix size");
DEBUG_ASSERT_EQ(vblob.size, value_size(), "Suffix value size mismatch with serialized suffix size");
DEBUG_ASSERT_EQ(kblob.size(), key_size(), "Suffix key size mismatch with serialized suffix size");
DEBUG_ASSERT_EQ(vblob.size(), value_size(), "Suffix value size mismatch with serialized suffix size");

std::memcpy(cur_ptr, kblob.bytes, kblob.size);
cur_ptr += kblob.size;
std::memcpy(cur_ptr, vblob.bytes, vblob.size);
std::memcpy(cur_ptr, kblob.cbytes(), kblob.size());
cur_ptr += kblob.size();
std::memcpy(cur_ptr, vblob.cbytes(), vblob.size());
}

sisl::blob key_buf() const {
return sisl::blob{const_cast< uint8_t* >(r_cast< uint8_t const* >(this) + sizeof(suffix_entry)),
key_size()};
}
sisl::blob val_buf() const { return sisl::blob{key_buf().bytes + key_buf().size, value_size()}; }
sisl::blob val_buf() const { return sisl::blob{key_buf().bytes() + key_buf().size(), value_size()}; }
};
#pragma pack()

Expand Down Expand Up @@ -778,7 +777,7 @@ class FixedPrefixNode : public VariantNode< K, V > {
K prevKey;
while (i < this->total_entries()) {
K key = BtreeNode::get_nth_key< K >(i, false);
uint64_t kp = *(uint64_t*)key.serialize().bytes;
uint64_t kp = *(uint64_t*)key.serialize().bytes();
if (i > 0 && prevKey.compare(key) > 0) {
DEBUG_ASSERT(false, "Found non sorted entry: {} -> {}", kp, to_string());
}
Expand Down
22 changes: 10 additions & 12 deletions src/include/homestore/btree/detail/simple_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,7 @@ class SimpleNode : public VariantNode< K, V > {

void get_nth_key_internal(uint32_t ind, BtreeKey& out_key, bool copy) const override {
DEBUG_ASSERT_LT(ind, this->total_entries(), "node={}", to_string());
sisl::blob b;
b.bytes = (uint8_t*)(this->node_data_area_const() + (get_nth_obj_size(ind) * ind));
b.size = get_nth_key_size(ind);
sisl::blob b{this->node_data_area_const() + (get_nth_obj_size(ind) * ind), get_nth_key_size(ind)};
out_key.deserialize(b, copy);
}

Expand Down Expand Up @@ -324,11 +322,11 @@ class SimpleNode : public VariantNode< K, V > {
set_nth_value(ind, v);
} else {
uint8_t* entry = this->node_data_area() + (get_nth_obj_size(ind) * ind);
sisl::blob key_blob = k.serialize();
memcpy((void*)entry, key_blob.bytes, key_blob.size);
sisl::blob const key_blob = k.serialize();
memcpy((void*)entry, key_blob.cbytes(), key_blob.size());

sisl::blob val_blob = v.serialize();
memcpy((void*)(entry + key_blob.size), val_blob.bytes, val_blob.size);
sisl::blob const val_blob = v.serialize();
memcpy((void*)(entry + key_blob.size()), val_blob.cbytes(), val_blob.size());
}
}

Expand All @@ -345,20 +343,20 @@ class SimpleNode : public VariantNode< K, V > {

void set_nth_key(uint32_t ind, BtreeKey* key) {
uint8_t* entry = this->node_data_area() + (get_nth_obj_size(ind) * ind);
sisl::blob b = key->serialize();
memcpy(entry, b.bytes, b.size);
sisl::blob const b = key->serialize();
memcpy(entry, b.cbytes(), b.size());
}

void set_nth_value(uint32_t ind, const BtreeValue& v) {
sisl::blob b = v.serialize();
if (ind >= this->total_entries()) {
RELEASE_ASSERT_EQ(this->is_leaf(), false, "setting value outside bounds on leaf node");
DEBUG_ASSERT_EQ(b.size, sizeof(BtreeLinkInfo::bnode_link_info),
DEBUG_ASSERT_EQ(b.size(), sizeof(BtreeLinkInfo::bnode_link_info),
"Invalid value size being set for non-leaf node");
this->set_edge_info(*r_cast< BtreeLinkInfo::bnode_link_info* >(b.bytes));
this->set_edge_info(*r_cast< BtreeLinkInfo::bnode_link_info const* >(b.cbytes()));
} else {
uint8_t* entry = this->node_data_area() + (get_nth_obj_size(ind) * ind) + get_nth_key_size(ind);
std::memcpy(entry, b.bytes, b.size);
std::memcpy(entry, b.cbytes(), b.size());
}
}
};
Expand Down
60 changes: 25 additions & 35 deletions src/include/homestore/btree/detail/varlen_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class VariableNode : public VariantNode< K, V > {
K prevKey;
while (i < this->total_entries()) {
K key = BtreeNode::get_nth_key< K >(i, false);
uint64_t kp = *(uint64_t*)key.serialize().bytes;
uint64_t kp = *(uint64_t*)key.serialize().bytes();
if (i > 0 && prevKey.compare(key) > 0) {
DEBUG_ASSERT(false, "Found non sorted entry: {} -> {}", kp, to_string());
}
Expand Down Expand Up @@ -136,16 +136,16 @@ class VariableNode : public VariantNode< K, V > {
sisl::blob kblob = key.serialize();
sisl::blob vblob = val.serialize();

DEBUG_ASSERT_EQ(kblob.size, key.serialized_size(),
DEBUG_ASSERT_EQ(kblob.size(), key.serialized_size(),
"Key Serialized size returned different after serialization");
DEBUG_ASSERT_EQ(vblob.size, val.serialized_size(),
DEBUG_ASSERT_EQ(vblob.size(), val.serialized_size(),
"Value Serialized size returned different after serialization");

// we can avoid memcpy if addresses of val_ptr and vblob.bytes is same. In place update
if (key_ptr != kblob.bytes) { std::memcpy(key_ptr, kblob.bytes, kblob.size); }
if (val_ptr != vblob.bytes) { std::memcpy(val_ptr, vblob.bytes, vblob.size); }
set_nth_key_len(get_nth_record_mutable(ind), kblob.size);
set_nth_value_len(get_nth_record_mutable(ind), vblob.size);
if (key_ptr != kblob.cbytes()) { std::memcpy(key_ptr, kblob.cbytes(), kblob.size()); }
if (val_ptr != vblob.cbytes()) { std::memcpy(val_ptr, vblob.cbytes(), vblob.size()); }
set_nth_key_len(get_nth_record_mutable(ind), kblob.size());
set_nth_value_len(get_nth_record_mutable(ind), vblob.size());
get_var_node_header()->m_available_space += cur_obj_size - new_obj_size;
this->inc_gen();
} else {
Expand Down Expand Up @@ -224,13 +224,8 @@ class VariableNode : public VariantNode< K, V > {
bool full_move{false};
while (ind >= end_ind) {
// Get the ith key and value blob and then remove the entry from here and insert to the other node
sisl::blob kb;
kb.bytes = (uint8_t*)get_nth_obj(ind);
kb.size = get_nth_key_size(ind);

sisl::blob vb;
vb.bytes = kb.bytes + kb.size;
vb.size = get_nth_value_size(ind);
sisl::blob const kb{get_nth_obj(ind), get_nth_key_size(ind)};
sisl::blob const vb{kb.cbytes() + kb.size(), get_nth_value_size(ind)};

auto sz = other.insert(0, kb, vb);
if (!sz) { break; }
Expand Down Expand Up @@ -265,15 +260,10 @@ class VariableNode : public VariantNode< K, V > {

uint32_t ind = this->total_entries() - 1;
while (ind > 0) {
sisl::blob kb;
kb.bytes = (uint8_t*)get_nth_obj(ind);
kb.size = get_nth_key_size(ind);

sisl::blob vb;
vb.bytes = kb.bytes + kb.size;
vb.size = get_nth_value_size(ind);
sisl::blob const kb{get_nth_obj(ind), get_nth_key_size(ind)};
sisl::blob const vb{kb.cbytes() + kb.size(), get_nth_value_size(ind)};

if ((kb.size + vb.size + this->get_record_size()) > size_to_move) {
if ((kb.size() + vb.size() + this->get_record_size()) > size_to_move) {
// We reached threshold of how much we could move
break;
}
Expand Down Expand Up @@ -322,11 +312,11 @@ class VariableNode : public VariantNode< K, V > {
auto idx = start_idx;
uint32_t n = 0;
while (idx < other.total_entries()) {
sisl::blob kb{(uint8_t*)other.get_nth_obj(idx), other.get_nth_key_size(idx)};
sisl::blob vb{kb.bytes + kb.size, other.get_nth_value_size(idx)};
sisl::blob const kb{(uint8_t*)other.get_nth_obj(idx), other.get_nth_key_size(idx)};
sisl::blob const vb{kb.cbytes() + kb.size(), other.get_nth_value_size(idx)};

// We reached threshold of how much we could move
if ((kb.size + vb.size + other.get_record_size()) > copy_size) { break; }
if ((kb.size() + vb.size() + other.get_record_size()) > copy_size) { break; }

auto sz = insert(this->total_entries(), kb, vb);
if (sz == 0) { break; }
Expand All @@ -352,8 +342,8 @@ class VariableNode : public VariantNode< K, V > {
auto idx = start_idx;
uint32_t n = 0;
while (n < nentries) {
sisl::blob kb{(uint8_t*)other.get_nth_obj(idx), other.get_nth_key_size(idx)};
sisl::blob vb{kb.bytes + kb.size, other.get_nth_value_size(idx)};
sisl::blob const kb{other.get_nth_obj(idx), other.get_nth_key_size(idx)};
sisl::blob const vb{kb.cbytes() + kb.size(), other.get_nth_value_size(idx)};

auto sz = insert(this->total_entries(), kb, vb);
if (sz == 0) { break; }
Expand Down Expand Up @@ -461,8 +451,8 @@ class VariableNode : public VariantNode< K, V > {
void set_nth_key(uint32_t ind, const BtreeKey& key) {
const auto kb = key.serialize();
assert(ind < this->total_entries());
assert(kb.size == get_nth_key_size(ind));
memcpy(uintptr_cast(get_nth_obj(ind)), kb.bytes, kb.size);
assert(kb.size() == get_nth_key_size(ind));
memcpy(uintptr_cast(get_nth_obj(ind)), kb.cbytes(), kb.size());
}

bool has_room_for_put(btree_put_type put_type, uint32_t key_size, uint32_t value_size) const override {
Expand Down Expand Up @@ -587,7 +577,7 @@ class VariableNode : public VariantNode< K, V > {
assert(ind <= this->total_entries());
LOGTRACEMOD(btree, "{}:{}:{}:{}", ind, get_var_node_header()->tail_offset(), get_arena_free_space(),
get_var_node_header()->available_space());
uint16_t obj_size = key_blob.size + val_blob.size;
uint16_t obj_size = key_blob.size() + val_blob.size();
uint16_t to_insert_size = obj_size + this->get_record_size();
if (to_insert_size > get_var_node_header()->available_space()) {
RELEASE_ASSERT(false, "insert failed insert size {} available size {}", to_insert_size,
Expand All @@ -613,15 +603,15 @@ class VariableNode : public VariantNode< K, V > {
get_var_node_header()->m_available_space -= (obj_size + this->get_record_size());

// Create a new record
set_nth_key_len(rec_ptr, key_blob.size);
set_nth_value_len(rec_ptr, val_blob.size);
set_nth_key_len(rec_ptr, key_blob.size());
set_nth_value_len(rec_ptr, val_blob.size());
set_record_data_offset(rec_ptr, get_var_node_header()->m_tail_arena_offset);

// Copy the contents of key and value in the offset
uint8_t* raw_data_ptr = offset_to_ptr_mutable(get_var_node_header()->m_tail_arena_offset);
memcpy(raw_data_ptr, key_blob.bytes, key_blob.size);
raw_data_ptr += key_blob.size;
memcpy(raw_data_ptr, val_blob.bytes, val_blob.size);
memcpy(raw_data_ptr, key_blob.cbytes(), key_blob.size());
raw_data_ptr += key_blob.size();
memcpy(raw_data_ptr, val_blob.cbytes(), val_blob.size());

// Increment the entries and generation number
this->inc_entries();
Expand Down
10 changes: 3 additions & 7 deletions src/include/homestore/homestore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class HomeStore;
class CPManager;
class VirtualDev;
class ChunkSelector;
class ReplDevListener;
class ReplApplication;

using HomeStoreSafePtr = std::shared_ptr< HomeStore >;

Expand Down Expand Up @@ -96,12 +98,6 @@ struct HS_SERVICE {
}
};

VENUM(repl_impl_type, uint8_t,
server_side, // Completely homestore controlled replication
client_assisted, // Client assisting in replication
solo // For single node - no replication
);

/*
* IO errors handling by homestore.
* Write error :- Reason :- Disk error, space full,btree node read fail
Expand Down Expand Up @@ -149,7 +145,7 @@ class HomeStore {
HomeStore& with_data_service(cshared< ChunkSelector >& custom_chunk_selector = nullptr);
HomeStore& with_log_service();
HomeStore& with_index_service(std::unique_ptr< IndexServiceCallbacks > cbs);
HomeStore& with_repl_data_service(repl_impl_type repl_type,
HomeStore& with_repl_data_service(cshared< ReplApplication >& repl_app,
cshared< ChunkSelector >& custom_chunk_selector = nullptr);

bool start(const hs_input_params& input, hs_before_services_starting_cb_t svcs_starting_cb = nullptr);
Expand Down
Loading

0 comments on commit edf0299

Please sign in to comment.