Skip to content

Commit

Permalink
SealShard state change in pre-commit
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 committed Dec 5, 2023
1 parent f3ad82e commit 4dc0358
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 25 deletions.
13 changes: 9 additions & 4 deletions src/lib/homestore_backend/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ void HeapChunkSelector::add_chunk_internal(const chunk_num_t chunkID) {
return;
}
const auto& chunk = m_chunks[chunkID];

{
std::lock_guard< std::mutex > l(m_defrag_mtx);
if (m_selected_chunks.find(chunkID) != m_selected_chunks.end()) return;
m_defrag_heap.emplace(chunk);
m_selected_chunks.erase(chunkID);
}

VChunk vchunk(chunk);
auto pdevID = vchunk.get_pdev_id();
// add this find here, since we don`t want to call make_shared in try_emplace every time.
Expand All @@ -36,10 +44,6 @@ void HeapChunkSelector::add_chunk_internal(const chunk_num_t chunkID) {

auto& heapLock = it->second->mtx;
auto& heap = it->second->m_heap;
{
std::lock_guard< std::mutex > l(m_defrag_mtx);
m_defrag_heap.emplace(chunk);
}
std::lock_guard< std::mutex > l(heapLock);
heap.emplace(chunk);
}
Expand Down Expand Up @@ -171,6 +175,7 @@ void HeapChunkSelector::remove_chunk_from_defrag_heap(const chunk_num_t chunkID)
for (auto& c : chunks) {
m_defrag_heap.emplace(c);
}
m_selected_chunks.erase(chunkID);
}

void HeapChunkSelector::foreach_chunks(std::function< void(csharedChunk&) >&& cb) {
Expand Down
1 change: 1 addition & 0 deletions src/lib/homestore_backend/heap_chunk_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class HeapChunkSelector : public homestore::ChunkSelector {
std::unordered_map< chunk_num_t, csharedChunk > m_chunks;

VChunkDefragHeap m_defrag_heap;
std::unordered_set< chunk_num_t > m_selected_chunks;
std::mutex m_defrag_mtx;

void add_chunk_internal(const chunk_num_t);
Expand Down
52 changes: 52 additions & 0 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,58 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
});
}

bool HSHomeObject::on_blob_put_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
}

if (ctx) { RELEASE_ASSERT(!ctx->promise_.isFulfilled(), "on_blob_put_pre_commit promise is already fulfilled"); }

auto msg_header = r_cast< ReplicationMessageHeader* >(header.bytes);
auto shard_id = msg_header->shard_id;

// used for test
if (smphSignal) smphSignal->acquire();

{
std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(shard_id);
RELEASE_ASSERT(shard_iter != _shard_map.end(), "Missing shard info");
if ((shard_iter->second->get()->info).state == ShardInfo::State::SEALED) {
LOGE("Shard {} is sealed when pre_commit put_blob", shard_id);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError::SEALED_SHARD)); }
// we return false here, so on_blob_put_commit will not be called.
// instead, on_blob_put_rollback will be called.
return false;
}
}
return true;
}

void HSHomeObject::on_blob_put_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
if (!hs_ctx) return;
auto msg_header = r_cast< ReplicationMessageHeader* >(header.bytes);
if (msg_header->corrupted()) {
LOGE("replication message header is corrupted with crc error, lsn:{}", lsn);
// TODO: stale blks will be left, GC will take care of it.
return;
}

auto& pg_id = msg_header->pg_id;
shared< homestore::ReplDev > repl_dev;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
repl_dev = static_cast< HS_PG* >(iter->second.get())->repl_dev_;
}
RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");
repl_dev->async_free_blks(lsn, hs_ctx->get_local_blkid());
}

void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
const homestore::MultiBlkId& pbas,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
Expand Down
6 changes: 6 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ void HSHomeObject::init_timer_thread() {

void HSHomeObject::trigger_timed_events() { persist_pg_sb(); }

void HSHomeObject::set_semaphore() { smphSignal = std::make_shared< std::binary_semaphore >(0); }

void HSHomeObject::release_semaphore() {
if (smphSignal) smphSignal->release();
}

void HSHomeObject::register_homestore_metablk_callback() {
// register some callbacks for metadata recovery;
using namespace homestore;
Expand Down
22 changes: 20 additions & 2 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <memory>
#include <mutex>
#include <semaphore>

#include <homestore/homestore.hpp>
#include <homestore/index/index_table.hpp>
Expand Down Expand Up @@ -160,6 +161,10 @@ class HSHomeObject : public HomeObjectImpl {
shared< HeapChunkSelector > chunk_selector_;
iomgr::timer_handle_t ho_timer_thread_handle_;

// used for testing when we only have solo_repl_dev.
// TODO: remove or change this when we have raft_repl_dev.
shared< std::binary_semaphore > smphSignal;

private:
static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); }

Expand All @@ -173,6 +178,10 @@ class HSHomeObject : public HomeObjectImpl {
void update_shard_in_map(const ShardInfo& shard_info);
void do_shard_message_commit(int64_t lsn, ReplicationMessageHeader& header, homestore::MultiBlkId const& blkids,
sisl::blob value, cintrusive< homestore::repl_req_ctx >& hs_ctx);
bool do_shard_message_pre_commit(int64_t lsn, ReplicationMessageHeader& header, sisl::blob value,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
void do_shard_message_rollback(int64_t lsn, ReplicationMessageHeader& header, sisl::blob value,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
// recover part
void register_homestore_metablk_callback();
void on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie);
Expand All @@ -199,15 +208,19 @@ class HSHomeObject : public HomeObjectImpl {
shared< HeapChunkSelector > chunk_selector() { return chunk_selector_; }

bool on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&);
cintrusive< homestore::repl_req_ctx >& hs_ctx);
void on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&);
cintrusive< homestore::repl_req_ctx >& hs_ctx);
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);

// Blob manager related.
void on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx);
bool on_blob_put_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
void on_blob_put_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
void on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
homestore::blk_alloc_hints blob_put_get_blk_alloc_hints(sisl::blob const& header,
Expand All @@ -230,6 +243,11 @@ class HSHomeObject : public HomeObjectImpl {
void print_btree_index(pg_id_t pg_id);

void trigger_timed_events();

// used for testing when we only have solo_repl_dev.
// TODO: remove or change this when we have raft_repl_dev.
void set_semaphore();
void release_semaphore();
};

class BlobIndexServiceCallbacks : public homestore::IndexServiceCallbacks {
Expand Down
120 changes: 109 additions & 11 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,99 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const
return req->result();
}

bool HSHomeObject::on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
if (hs_ctx != nullptr) {
auto ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx);
return do_shard_message_pre_commit(lsn, *r_cast< ReplicationMessageHeader* >(header.bytes), ctx->hdr_buf_,
hs_ctx);
}
return true;
}

bool HSHomeObject::do_shard_message_pre_commit(int64_t lsn, ReplicationMessageHeader& header, sisl::blob value,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get();
}

if (ctx) {
RELEASE_ASSERT(!ctx->promise_.isFulfilled(), "do_shard_message_pre_commit promise is already fulfilled");
}

if (header.corrupted()) {
LOGW("replication message header is corrupted with crc error, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
return false;
}

if (crc32_ieee(init_crc32, value.bytes, value.size) != header.payload_crc) {
// header & value is inconsistent;
LOGW("replication message header is inconsistent with value, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
return false;
}

auto shard_info = deserialize_shard_info(r_cast< const char* >(value.bytes), value.size);
switch (header.msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
// we can not release chunk here, since if rollback happens, we can not make sure we can get the same chunk.
// it might be selected by other creat_shard after we release it.
// we need to wait for the commit phase to release chunk;
update_shard_in_map(shard_info);
if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }
break;
}
default: {
break;
}
}

return true;
}

void HSHomeObject::on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
if (hs_ctx != nullptr) {
auto ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx);
do_shard_message_rollback(lsn, *r_cast< ReplicationMessageHeader* >(header.bytes), ctx->hdr_buf_, hs_ctx);
}
}

void HSHomeObject::do_shard_message_rollback(int64_t lsn, ReplicationMessageHeader& header, sisl::blob value,
[[maybe_unused]] cintrusive< homestore::repl_req_ctx >& hs_ctx) {
if (header.corrupted()) {
LOGW("replication message header is corrupted with crc error, lsn:{}", lsn);
return;
}

if (crc32_ieee(init_crc32, value.bytes, value.size) != header.payload_crc) {
// header & value is inconsistent;
LOGW("replication message header is inconsistent with value, lsn:{}", lsn);
return;
}

// TODO:: what if we pre_commit successfully , but rollback failed(e.g., log header corrupted) ?
// seem we just crash here if that happens

auto shard_info = deserialize_shard_info(r_cast< const char* >(value.bytes), value.size);
switch (header.msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
shard_info.state = ShardInfo::State::OPEN;
update_shard_in_map(shard_info);
break;
}
default: {
break;
}
}

// promise should not be setvalue in roll back
// 1 if the actual execution happens in pre_commit, the promise will be setvalue in pre_commit
// 2 if the actual execution happens in commit, the promise will be setvalue in commit
}

void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids,
homestore::ReplDev* repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
Expand Down Expand Up @@ -179,23 +272,30 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader
homestore::MultiBlkId const& blkids, sisl::blob value,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr};

// for create_shard, we need to fulfill the promise;
// for seal_shard, the promise is already fulfilled in pre_commit;
bool ctxFullfilled{true};
if (hs_ctx != nullptr) {
ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get();
ctxFullfilled = ctx->promise_.isFulfilled();
}

if (header.corrupted()) {
LOGW("replication message header is corrupted with crc error, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
if (!ctxFullfilled) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
return;
}

if (crc32_ieee(init_crc32, value.bytes, value.size) != header.payload_crc) {
// header & value is inconsistent;
LOGW("replication message header is inconsistent with value, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
if (!ctxFullfilled) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
return;
}

// TODO:: for seal_shard, what if we pre_commit successfully , but commit failed(e.g., log header corrupted) ?

auto shard_info = deserialize_shard_info(r_cast< const char* >(value.bytes), value.size);
switch (header.msg_type) {
case ReplicationMessageType::CREATE_SHARD_MSG: {
Expand All @@ -214,7 +314,6 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader

break;
}

case ReplicationMessageType::SEAL_SHARD_MSG: {
ShardInfo::State state;
{
Expand All @@ -224,21 +323,20 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader
state = (*iter->second)->info.state;
}

if (state == ShardInfo::State::OPEN) {
auto chunk_id = get_shard_chunk(shard_info.id);
RELEASE_ASSERT(chunk_id.has_value(), "Chunk id not found");
chunk_selector()->release_chunk(chunk_id.value());
update_shard_in_map(shard_info);
}
RELEASE_ASSERT(state == ShardInfo::State::SEALED, "Shard should be sealed before commit");

break;
auto chunk_id = get_shard_chunk(shard_info.id);
RELEASE_ASSERT(chunk_id.has_value(), "Chunk id not found");
// when restarting, the chunk is not selected since the metablk indicates the shard is sealed.
// handle this in chunk selector
chunk_selector()->release_chunk(chunk_id.value());
}
default: {
break;
}
}

if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }
if (!ctxFullfilled) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }
}

void HSHomeObject::add_new_shard_to_map(ShardPtr&& shard) {
Expand Down
Loading

0 comments on commit 4dc0358

Please sign in to comment.