Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error handling on pre-append et al. #361

Merged
merged 7 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "5.1.15"
version = "6.0.1"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down Expand Up @@ -55,9 +55,9 @@ def build_requirements(self):
self.build_requires("gtest/1.14.0")

def requirements(self):
self.requires("iomgr/[~=11, include_prerelease=True]@oss/master")
self.requires("sisl/[~=11, include_prerelease=True]@oss/master")
self.requires("nuraft_mesg/[~=3, include_prerelease=True]@oss/main")
self.requires("iomgr/[~=11.2, include_prerelease=True]@oss/master")
self.requires("sisl/[~=12, include_prerelease=True]@oss/master")
self.requires("nuraft_mesg/[~=3.3, include_prerelease=True]@oss/main")

self.requires("farmhash/cci.20190513@")
if self.settings.arch in ['x86', 'x86_64']:
Expand Down
24 changes: 16 additions & 8 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <flatbuffers/flatbuffers.h>
#include <folly/futures/Future.h>
#include <sisl/fds/buffer.hpp>
#include <sisl/fds/utils.hpp>
#include <sisl/grpc/generic_service.hpp>
#include <homestore/replication/repl_decls.h>

Expand Down Expand Up @@ -54,6 +55,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
friend class SoloReplDev;

public:
repl_req_ctx() { start_time = Clock::now(); }
virtual ~repl_req_ctx();
int64_t get_lsn() const { return lsn; }
MultiBlkId const& get_local_blkid() const { return local_blkid; }
Expand All @@ -66,13 +68,15 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::

std::string to_string() const;
std::string to_compact_string() const;
Clock::time_point created_time() const { return start_time; }

public:
repl_key rkey; // Unique key for the request
sisl::blob header; // User header
sisl::blob key; // User supplied key for this req
int64_t lsn{-1}; // Lsn for this replication req
bool is_proposer{false}; // Is the repl_req proposed by this node
repl_key rkey; // Unique key for the request
sisl::blob header; // User header
sisl::blob key; // User supplied key for this req
int64_t lsn{-1}; // Lsn for this replication req
bool is_proposer{false}; // Is the repl_req proposed by this node
Clock::time_point start_time; // Start time of the request

//////////////// Value related section /////////////////
sisl::sg_list value; // Raw value - applicable only to leader req
Expand All @@ -83,10 +87,12 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
//////////////// Journal/Buf related section /////////////////
std::variant< std::unique_ptr< uint8_t[] >, raft_buf_ptr_t > journal_buf; // Buf for the journal entry
repl_journal_entry* journal_entry{nullptr}; // pointer to the journal entry
bool is_jentry_localize_pending{false}; // Is the journal entry needs to be localized from remote

//////////////// Replication state related section /////////////////
std::mutex state_mtx;
std::atomic< uint32_t > state{uint32_cast(repl_req_state_t::INIT)}; // State of the replication request
folly::Promise< folly::Unit > data_received_promise; // Promise to be fulfilled when data is received
folly::Promise< folly::Unit > data_written_promise; // Promise to be fulfilled when data is written

//////////////// Communication packet/builder section /////////////////
Expand Down Expand Up @@ -178,8 +184,10 @@ class ReplDevListener {
///
/// @param header Header originally passed with repl_dev::async_alloc_write() api on the leader
/// @param data_size Size needed to be allocated for
/// @return Expected to return blk_alloc_hints for this write
virtual blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) = 0;
/// @return Expected to return blk_alloc_hints for this write. If the hints are not available, then return the
/// error. It is to be noted this method should return error only in very abnornal cases as in some code flow, an
/// error would result in a crash or stall of the entire commit thread.
virtual ReplResult< blk_alloc_hints > get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) = 0;

/// @brief Called when the replica set is being stopped
virtual void on_replica_stop() = 0;
Expand Down Expand Up @@ -241,7 +249,7 @@ class ReplDev {
virtual bool is_leader() const = 0;

/// @brief get the leader replica_id of given group
virtual const replica_id_t get_leader_id() const = 0;
virtual replica_id_t get_leader_id() const = 0;

/// @brief get replication status. If called on follower member
/// this API can return empty result.
Expand Down
20 changes: 2 additions & 18 deletions src/lib/blkalloc/bitmap_blk_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,22 +134,6 @@ void BitmapBlkAllocator::free_on_disk(BlkId const& bid) {
BlkAllocPortion& portion = blknum_to_portion(b.blk_num());
{
auto lock{portion.portion_auto_lock()};
if (!hs()->is_initializing()) {
// During recovery we might try to free the entry which is already freed while replaying the
// journal, This assert is valid only post recovery.
if (!m_disk_bm->is_bits_set(b.blk_num(), b.blk_count())) {
BLKALLOC_LOG(ERROR, "bit not set {} nblks {} chunk number {}", b.blk_num(), b.blk_count(),
m_chunk_id);
for (blk_count_t i{0}; i < b.blk_count(); ++i) {
if (!m_disk_bm->is_bits_set(b.blk_num() + i, 1)) {
BLKALLOC_LOG(ERROR, "bit not set {}", b.blk_num() + i);
}
}
BLKALLOC_REL_ASSERT(m_disk_bm->is_bits_set(b.blk_num(), b.blk_count()),
"Expected disk bits to set blk num {} num blks {}", b.blk_num(), b.blk_count());
}
}

m_disk_bm->reset_bits(b.blk_num(), b.blk_count());
}
};
Expand All @@ -168,7 +152,7 @@ void BitmapBlkAllocator::free_on_disk(BlkId const& bid) {
sisl::byte_array BitmapBlkAllocator::acquire_underlying_buffer() {
// prepare and temporary alloc list, where blkalloc is accumulated till underlying buffer is released.
// RCU will wait for all I/Os that are still in critical section (allocating on disk bm) to complete and exit;
auto alloc_list_ptr = new sisl::ThreadVector< BlkId >();
auto alloc_list_ptr = new sisl::ThreadVector< MultiBlkId >();
auto old_alloc_list_ptr = rcu_xchg_pointer(&m_alloc_blkid_list, alloc_list_ptr);
synchronize_rcu();

Expand All @@ -195,7 +179,7 @@ void BitmapBlkAllocator::release_underlying_buffer() {
/* Get status */
nlohmann::json BitmapBlkAllocator::get_status(int) const { return nlohmann::json{}; }

sisl::ThreadVector< BlkId >* BitmapBlkAllocator::get_alloc_blk_list() {
sisl::ThreadVector< MultiBlkId >* BitmapBlkAllocator::get_alloc_blk_list() {
auto p = rcu_dereference(m_alloc_blkid_list);
return p;
}
Expand Down
4 changes: 2 additions & 2 deletions src/lib/blkalloc/bitmap_blk_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class BitmapBlkAllocator : public BlkAllocator {

private:
void do_init();
sisl::ThreadVector< BlkId >* get_alloc_blk_list();
sisl::ThreadVector< MultiBlkId >* get_alloc_blk_list();
void on_meta_blk_found(void* mblk_cookie, sisl::byte_view const& buf, size_t size);

// Acquire the underlying bitmap buffer and while the caller has acquired, all the new allocations
Expand All @@ -116,7 +116,7 @@ class BitmapBlkAllocator : public BlkAllocator {
blk_num_t m_blks_per_portion;

private:
sisl::ThreadVector< BlkId >* m_alloc_blkid_list{nullptr};
sisl::ThreadVector< MultiBlkId >* m_alloc_blkid_list{nullptr};
std::unique_ptr< BlkAllocPortion[] > m_blk_portions;
std::unique_ptr< sisl::Bitset > m_disk_bm{nullptr};
std::atomic< bool > m_is_disk_bm_dirty{true}; // initially disk_bm treated as dirty
Expand Down
9 changes: 4 additions & 5 deletions src/lib/blkalloc/blk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ void BlkId::invalidate() { s.m_nblks = 0; }
bool BlkId::is_valid() const { return (blk_count() > 0); }

std::string BlkId::to_string() const {
return is_valid() ? fmt::format("BlkNum={} nblks={} chunk={}", blk_num(), blk_count(), chunk_num())
: "Invalid_Blkid";
return is_valid() ? fmt::format("blk#={} count={} chunk={}", blk_num(), blk_count(), chunk_num()) : "Invalid_Blkid";
}

int BlkId::compare(const BlkId& one, const BlkId& two) {
Expand Down Expand Up @@ -126,12 +125,12 @@ bool MultiBlkId::has_room() const { return (n_addln_piece < max_addln_pieces); }
MultiBlkId::iterator MultiBlkId::iterate() const { return MultiBlkId::iterator{*this}; }

std::string MultiBlkId::to_string() const {
std::string str = "MultiBlks: {";
std::string str = "[";
auto it = iterate();
while (auto const b = it.next()) {
str += (b->to_string() + " ");
str += "{" + (b->to_string() + "},");
}
str += std::string("}");
str += std::string("]");
return str;
}

Expand Down
15 changes: 9 additions & 6 deletions src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,17 @@ table Consensus {
// Minimum log gap a replica has to be from leader before joining the replica set.
min_log_gap_to_join: int32 = 30;

// amount of time in seconds to wait on data write before fetch data from remote;
wait_data_write_timer_sec: uint32 = 30 (hotswap);
// amount of time in millis to wait on data write before fetch data from remote;
wait_data_write_timer_ms: uint64 = 1500 (hotswap);

// Leadership expiry 120 seconds
leadership_expiry_ms: uint32 = 120000;
// Leadership expiry (=0 indicates 20 times heartbeat period), set -1 to never expire
leadership_expiry_ms: int32 = 0;
xiaoxichen marked this conversation as resolved.
Show resolved Hide resolved

// data fetch max size limit in MB
data_fetch_max_size_mb: uint32 = 2;
// data fetch max size limit in KB (2MB by default)
data_fetch_max_size_kb: uint32 = 2048;

// Timeout for data to be received after raft entry after which raft entry is rejected.
data_receive_timeout_ms: uint64 = 10000;
}

table HomeStoreSettings {
Expand Down
40 changes: 21 additions & 19 deletions src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,23 @@ static constexpr store_lsn_t to_store_lsn(uint64_t raft_lsn) { return s_cast< st
static constexpr store_lsn_t to_store_lsn(repl_lsn_t repl_lsn) { return repl_lsn - 1; }
static constexpr repl_lsn_t to_repl_lsn(store_lsn_t store_lsn) { return store_lsn + 1; }

static nuraft::ptr< nuraft::log_entry > to_nuraft_log_entry(const log_buffer& log_bytes) {
uint8_t const* raw_ptr = log_bytes.bytes();
static nuraft::ptr< nuraft::log_entry > to_nuraft_log_entry(sisl::blob const& log_blob) {
uint8_t const* raw_ptr = log_blob.cbytes();
uint64_t term = *r_cast< uint64_t const* >(raw_ptr);
raw_ptr += sizeof(uint64_t);
nuraft::log_val_type type = static_cast< nuraft::log_val_type >(*raw_ptr);
raw_ptr += sizeof(nuraft::log_val_type);

size_t data_len = log_bytes.size() - sizeof(uint64_t) - sizeof(nuraft::log_val_type);
size_t data_len = log_blob.size() - sizeof(uint64_t) - sizeof(nuraft::log_val_type);
auto nb = nuraft::buffer::alloc(data_len);
nb->put_raw(raw_ptr, data_len);
return nuraft::cs_new< nuraft::log_entry >(term, nb, type);
}

static nuraft::ptr< nuraft::log_entry > to_nuraft_log_entry(const log_buffer& log_bytes) {
return to_nuraft_log_entry(log_bytes.get_blob());
}

static uint64_t extract_term(const log_buffer& log_bytes) {
uint8_t const* raw_ptr = log_bytes.bytes();
return (*r_cast< uint64_t const* >(raw_ptr));
Expand Down Expand Up @@ -115,27 +119,23 @@ ulong HomeRaftLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) {
REPL_STORE_LOG(TRACE, "append entry term={}, log_val_type={} size={}", entry->get_term(),
static_cast< uint32_t >(entry->get_val_type()), entry->get_buf().size());
auto buf = entry->serialize();
return append(buf);
}

ulong HomeRaftLogStore::append(raft_buf_ptr_t& buffer) {
auto next_seq = m_log_store->append_async(
sisl::io_blob{buffer->data_begin(), uint32_cast(buffer->size()), false /* is_aligned */}, nullptr /* cookie */,
[buffer](int64_t, sisl::io_blob&, logdev_key, void*) {});
auto const next_seq =
m_log_store->append_async(sisl::io_blob{buf->data_begin(), uint32_cast(buf->size()), false /* is_aligned */},
nullptr /* cookie */, [buf](int64_t, sisl::io_blob&, logdev_key, void*) {});
return to_repl_lsn(next_seq);
}

void HomeRaftLogStore::write_at(ulong index, nuraft::ptr< nuraft::log_entry >& entry) {
auto buf = entry->serialize();
write_at(index, buf);
}

void HomeRaftLogStore::write_at(ulong index, raft_buf_ptr_t& buffer) {
m_log_store->rollback_async(to_store_lsn(index) - 1, nullptr);

// we need to reset the durable lsn, because its ok to set to lower number as it will be updated on next flush
// calls, but it is dangerous to set higher number.
m_last_durable_lsn = -1;
append(buffer);

m_log_store->append_async(sisl::io_blob{buf->data_begin(), uint32_cast(buf->size()), false /* is_aligned */},
nullptr /* cookie */, [buf](int64_t, sisl::io_blob&, logdev_key, void*) {});
}

void HomeRaftLogStore::end_of_append_batch(ulong start, ulong cnt) {
Expand Down Expand Up @@ -232,12 +232,14 @@ void HomeRaftLogStore::apply_pack(ulong index, nuraft::buffer& pack) {

for (int i{0}; i < num_entries; ++i) {
size_t entry_len;
auto* entry = const_cast< nuraft::byte* >(pack.get_bytes(entry_len));
[[maybe_unused]] auto store_sn =
m_log_store->append_async(sisl::io_blob{entry, uint32_cast(entry_len), false}, nullptr, nullptr);
REPL_STORE_LOG(TRACE, "unpacking nth_entry={} of size={}, lsn={}", i + 1, entry_len, to_repl_lsn(store_sn));
auto* entry = pack.get_bytes(entry_len);
sisl::blob b{entry, uint32_cast(entry_len)};

auto nle = to_nuraft_log_entry(b);
this->append(nle);
REPL_STORE_LOG(TRACE, "unpacking nth_entry={} of size={}, lsn={}", i + 1, entry_len, slot + i);
}
m_log_store->flush_sync(to_store_lsn(index) + num_entries - 1);
this->end_of_append_batch(slot, num_entries);
}

bool HomeRaftLogStore::compact(ulong compact_lsn) {
Expand Down
29 changes: 8 additions & 21 deletions src/lib/replication/log_store/repl_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,10 @@ uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) {
// We don't want to transform anything that is not an app log
if (entry->get_val_type() != nuraft::log_val_type::app_log) { return HomeRaftLogStore::append(entry); }

repl_req_ptr_t rreq = m_sm.transform_journal_entry(entry);
ulong lsn;
if (rreq->is_proposer || rreq->value_inlined) {
// No need of any transformation for proposer or inline data, since the entry is already meaningful
lsn = HomeRaftLogStore::append(entry);
} else {
lsn = HomeRaftLogStore::append(rreq->raft_journal_buf());
}
repl_req_ptr_t rreq = m_sm.localize_journal_entry_finish(*entry);
ulong lsn = HomeRaftLogStore::append(entry);
m_sm.link_lsn_to_req(rreq, int64_cast(lsn));

RD_LOG(DEBUG, "Raft Channel: Received append log entry rreq=[{}]", rreq->to_compact_string());
return lsn;
}
Expand All @@ -30,13 +25,8 @@ void ReplLogStore::write_at(ulong index, nuraft::ptr< nuraft::log_entry >& entry
return;
}

repl_req_ptr_t rreq = m_sm.transform_journal_entry(entry);
if (rreq->is_proposer || rreq->value_inlined) {
// No need of any transformation for proposer or inline data, since the entry is already meaningful
HomeRaftLogStore::write_at(index, entry);
} else {
HomeRaftLogStore::write_at(index, rreq->raft_journal_buf());
}
repl_req_ptr_t rreq = m_sm.localize_journal_entry_finish(*entry);
HomeRaftLogStore::write_at(index, entry);
m_sm.link_lsn_to_req(rreq, int64_cast(index));
RD_LOG(DEBUG, "Raft Channel: Received write_at log entry rreq=[{}]", rreq->to_compact_string());
}
Expand All @@ -51,10 +41,7 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
// Skip this call in proposer, since this method will synchronously flush the data, which is not required for
// leader. Proposer will call the flush as part of commit after receiving quorum, upon which time, there is a
// high possibility the log entry is already flushed.
if (rreq && rreq->is_proposer) {
RD_LOG(TRACE, "Raft Channel: Ignoring to flush proposer request rreq=[{}]", rreq->to_compact_string());
continue;
}
if (rreq && rreq->is_proposer) { continue; }
reqs->emplace_back(std::move(rreq));
}

Expand All @@ -74,9 +61,9 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
HomeRaftLogStore::end_of_append_batch(start_lsn, count);

// Wait for the fetch and write to be completed successfully.
std::move(fut).get();
std::move(fut).wait();

// Mark all the pbas also completely written
// Mark all the reqs also completely written
for (auto const& rreq : *reqs) {
if (rreq) { rreq->state.fetch_or(uint32_cast(repl_req_state_t::LOG_FLUSHED)); }
}
Expand Down
4 changes: 2 additions & 2 deletions src/lib/replication/repl_dev/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ std::string repl_req_ctx::to_string() const {
}

std::string repl_req_ctx::to_compact_string() const {
return fmt::format("dsn={} term={} lsn={} Blkid={} state=[{}]", rkey.dsn, rkey.term, lsn, local_blkid.to_string(),
req_state_name(state.load()));
return fmt::format("dsn={} term={} lsn={} local_blkid={} state=[{}]", rkey.dsn, rkey.term, lsn,
local_blkid.to_string(), req_state_name(state.load()));
}

} // namespace homestore
8 changes: 4 additions & 4 deletions src/lib/replication/repl_dev/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
#include <homestore/superblk_handler.hpp>

namespace homestore {
VENUM(journal_type_t, uint16_t, HS_LARGE_DATA = 0, HS_HEADER_ONLY = 1)
VENUM(journal_type_t, uint16_t, HS_DATA_LINKED = 0, HS_DATA_INLINED = 1)

#pragma pack(1)
struct repl_journal_entry {
static constexpr uint16_t JOURNAL_ENTRY_MAJOR = 1;
static constexpr uint16_t JOURNAL_ENTRY_MINOR = 1;
Expand All @@ -34,8 +35,8 @@ struct repl_journal_entry {
uint16_t minor_version{JOURNAL_ENTRY_MINOR};

journal_type_t code;
int32_t server_id;
uint64_t dsn; // Data seq number
int32_t server_id; // Server id from where journal entry is originated
uint64_t dsn; // Data seq number
uint32_t user_header_size;
uint32_t key_size;
uint32_t value_size;
Expand All @@ -53,7 +54,6 @@ struct repl_journal_entry {
}
};

#pragma pack(1)
struct repl_dev_superblk {
static constexpr uint64_t REPL_DEV_SB_MAGIC = 0xABCDF00D;
static constexpr uint32_t REPL_DEV_SB_VERSION = 1;
Expand Down
Loading
Loading