Skip to content

Commit

Permalink
SealShard state change in pre-commit
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 committed Jan 18, 2024
1 parent c3a1459 commit f3a4ed9
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 28 deletions.
8 changes: 7 additions & 1 deletion src/lib/homestore_backend/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ namespace homeobject {
// 2 the key collection of m_chunks will never change

// this should only be called when initializing HeapChunkSelector in Homestore
void HeapChunkSelector::add_chunk(csharedChunk& chunk) { m_chunks.emplace(VChunk(chunk).get_chunk_id(), chunk); }
void HeapChunkSelector::add_chunk(csharedChunk& chunk) {
auto chunk_id = VChunk(chunk).get_chunk_id();
m_chunks.emplace(chunk_id, chunk);
m_selected_chunks.emplace(chunk_id);
}

void HeapChunkSelector::add_chunk_internal(const chunk_num_t chunkID, bool add_to_heap) {
if (m_chunks.find(chunkID) == m_chunks.end()) {
Expand Down Expand Up @@ -45,6 +49,7 @@ void HeapChunkSelector::add_chunk_internal(const chunk_num_t chunkID, bool add_t
{
std::lock_guard< std::mutex > l(m_defrag_mtx);
m_defrag_heap.emplace(chunk);
m_selected_chunks.erase(chunkID);
}
std::lock_guard< std::mutex > l(heapLock);
heap.emplace(chunk);
Expand Down Expand Up @@ -178,6 +183,7 @@ void HeapChunkSelector::remove_chunk_from_defrag_heap(const chunk_num_t chunkID)
for (auto& c : chunks) {
m_defrag_heap.emplace(c);
}
m_selected_chunks.emplace(chunkID);
}

void HeapChunkSelector::foreach_chunks(std::function< void(csharedChunk&) >&& cb) {
Expand Down
1 change: 1 addition & 0 deletions src/lib/homestore_backend/heap_chunk_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class HeapChunkSelector : public homestore::ChunkSelector {
void add_chunk_internal(const chunk_num_t, bool add_to_heap = true);

VChunkDefragHeap m_defrag_heap;
std::unordered_set< chunk_num_t > m_selected_chunks;
std::mutex m_defrag_mtx;

void remove_chunk_from_defrag_heap(const chunk_num_t);
Expand Down
65 changes: 62 additions & 3 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,73 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
});
}

bool HSHomeObject::on_blob_put_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
}

if (ctx) { RELEASE_ASSERT(!ctx->promise_.isFulfilled(), "on_blob_put_pre_commit promise is already fulfilled"); }

auto msg_header = r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes()));
auto shard_id = msg_header->shard_id;

// used for test
#ifndef NDEBUG
if (smphSignal) smphSignal->acquire();
#endif

{
std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(shard_id);
RELEASE_ASSERT(shard_iter != _shard_map.end(), "Missing shard info");
if ((shard_iter->second->get()->info).state == ShardInfo::State::SEALED) {
LOGE("Shard {} is sealed when pre_commit put_blob", shard_id);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError::SEALED_SHARD)); }
// we return false here, so on_blob_put_commit will not be called.
// instead, on_blob_put_rollback will be called.

// TODO: solo_repl_dev not check the returen value of pre_commit. this logic should be added to
// solo_repl_dev,if we get a false from pre_commit, we should call on_blob_put_rollback.
return false;
}
}
return true;
}

void HSHomeObject::on_blob_put_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
if (!hs_ctx) return;
auto msg_header = r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes()));
if (msg_header->corrupted()) {
LOGE("replication message header is corrupted with crc error, lsn:{}", lsn);
// TODO: stale blks will be left, GC will take care of it.
return;
}

auto& pg_id = msg_header->pg_id;
shared< homestore::ReplDev > repl_dev;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
repl_dev = static_cast< HS_PG* >(iter->second.get())->repl_dev_;
}
RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");
repl_dev->async_free_blks(lsn, hs_ctx->get_local_blkid());
}

void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
const homestore::MultiBlkId& pbas,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
if (ctx->promise_.isFulfilled()) {
LOGE("on_blob_put_commit promise is already fulfilled in pre_commit");
return;
}
}

auto msg_header = r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes()));
Expand Down Expand Up @@ -392,9 +453,7 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis
}

auto& multiBlks = r.value();
if (multiBlks != tombstone_pbas) {
repl_dev->async_free_blks(lsn, multiBlks);
}
if (multiBlks != tombstone_pbas) { repl_dev->async_free_blks(lsn, multiBlks); }

if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); }
}
Expand Down
8 changes: 7 additions & 1 deletion src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,13 @@ void HSHomeObject::init_cp() {
std::move(std::make_unique< HomeObjCPCallbacks >(this)));
}

// void HSHomeObject::trigger_timed_events() { persist_pg_sb(); }
#ifndef NDEBUG
void HSHomeObject::set_semaphore() { smphSignal = std::make_shared< std::binary_semaphore >(0); }

void HSHomeObject::release_semaphore() {
if (smphSignal) smphSignal->release();
}
#endif

void HSHomeObject::register_homestore_metablk_callback() {
// register some callbacks for metadata recovery;
Expand Down
23 changes: 20 additions & 3 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <memory>
#include <mutex>
#include <semaphore>

#include <homestore/homestore.hpp>
#include <homestore/index/index_table.hpp>
Expand Down Expand Up @@ -216,6 +217,10 @@ class HSHomeObject : public HomeObjectImpl {
shared< HeapChunkSelector > chunk_selector_;
bool recovery_done_{false};

// used for testing when we only have solo_repl_dev.
// TODO: remove or change this when we have raft_repl_dev.
shared< std::binary_semaphore > smphSignal;

private:
static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); }

Expand All @@ -229,6 +234,10 @@ class HSHomeObject : public HomeObjectImpl {
void update_shard_in_map(const ShardInfo& shard_info);
void do_shard_message_commit(int64_t lsn, ReplicationMessageHeader& header, homestore::MultiBlkId const& blkids,
sisl::blob value, cintrusive< homestore::repl_req_ctx >& hs_ctx);
bool do_shard_message_pre_commit(int64_t lsn, ReplicationMessageHeader& header, sisl::blob value,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
void do_shard_message_rollback(int64_t lsn, ReplicationMessageHeader& header, sisl::blob value,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
// recover part
void register_homestore_metablk_callback();
void on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie);
Expand Down Expand Up @@ -291,15 +300,19 @@ class HSHomeObject : public HomeObjectImpl {
cshared< HeapChunkSelector > chunk_selector() const { return chunk_selector_; }

bool on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&);
cintrusive< homestore::repl_req_ctx >& hs_ctx);
void on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&);
cintrusive< homestore::repl_req_ctx >& hs_ctx);
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);

// Blob manager related.
void on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx);
bool on_blob_put_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
void on_blob_put_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
void on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
homestore::blk_alloc_hints blob_put_get_blk_alloc_hints(sisl::blob const& header,
Expand All @@ -321,7 +334,11 @@ class HSHomeObject : public HomeObjectImpl {
const BlobInfo& blob_info);
void print_btree_index(pg_id_t pg_id);

// void trigger_timed_events();
// used for testing when we only have solo_repl_dev.
#ifndef NDEBUG
void set_semaphore();
void release_semaphore();
#endif
};

class BlobIndexServiceCallbacks : public homestore::IndexServiceCallbacks {
Expand Down
122 changes: 110 additions & 12 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,100 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const
return req->result();
}

bool HSHomeObject::on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
if (hs_ctx != nullptr) {
auto ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx);
return do_shard_message_pre_commit(
lsn, *r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())), ctx->hdr_buf_, hs_ctx);
}
return true;
}

bool HSHomeObject::do_shard_message_pre_commit(int64_t lsn, ReplicationMessageHeader& header, sisl::blob value,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get();
}

if (ctx) {
RELEASE_ASSERT(!ctx->promise_.isFulfilled(), "do_shard_message_pre_commit promise is already fulfilled");
}

if (header.corrupted()) {
LOGW("replication message header is corrupted with crc error, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
return false;
}

if (crc32_ieee(init_crc32, r_cast< const unsigned char* >(value.bytes()), value.size()) != header.payload_crc) {
// header & value is inconsistent;
LOGW("replication message header is inconsistent with value, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
return false;
}

auto shard_info = deserialize_shard_info(r_cast< const char* >(value.bytes()), value.size());
switch (header.msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
// we can not release chunk here, since if rollback happens, we can not make sure we can get the same chunk.
// chunk selector will always return the a chunk of least used, and GC will happen at the time window
// we can not make sure we can get the same chunk as before, so we need to wait for the commit phase to release
// chunk;
update_shard_in_map(shard_info);
break;
}
default: {
break;
}
}

return true;
}

void HSHomeObject::on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
if (hs_ctx != nullptr) {
auto ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx);
do_shard_message_rollback(lsn, *r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())),
ctx->hdr_buf_, hs_ctx);
}
}

void HSHomeObject::do_shard_message_rollback(int64_t lsn, ReplicationMessageHeader& header, sisl::blob value,
[[maybe_unused]] cintrusive< homestore::repl_req_ctx >& hs_ctx) {
if (header.corrupted()) {
LOGW("replication message header is corrupted with crc error, lsn:{}", lsn);
return;
}

if (crc32_ieee(init_crc32, r_cast< const unsigned char* >(value.bytes()), value.size()) != header.payload_crc) {
// header & value is inconsistent;
LOGW("replication message header is inconsistent with value, lsn:{}", lsn);
return;
}

// TODO:: what if we pre_commit successfully , but rollback failed(e.g., log header corrupted) ?
// seem we just crash here if that happens

auto shard_info = deserialize_shard_info(r_cast< const char* >(value.bytes()), value.size());
switch (header.msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
shard_info.state = ShardInfo::State::OPEN;
update_shard_in_map(shard_info);
break;
}
default: {
break;
}
}

// promise should not be setvalue in roll back
// 1 if the actual execution happens in pre_commit, the promise will be setvalue in pre_commit
// 2 if the actual execution happens in commit, the promise will be setvalue in commit
}

void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids,
homestore::ReplDev* repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
Expand Down Expand Up @@ -180,24 +274,30 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader
homestore::MultiBlkId const& blkids, sisl::blob value,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr};

// for create_shard, we need to fulfill the promise;
// for seal_shard, the promise is already fulfilled in pre_commit;
bool ctxFullfilled{true};
if (hs_ctx != nullptr) {
ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get();
ctxFullfilled = ctx->promise_.isFulfilled();
}

if (header.corrupted()) {
LOGW("replication message header is corrupted with crc error, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
if (!ctxFullfilled) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
return;
}

if (crc32_ieee(init_crc32, value.cbytes(), value.size()) != header.payload_crc) {
// header & value is inconsistent;
LOGW("replication message header is inconsistent with value, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
if (!ctxFullfilled) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
return;
}

auto shard_info = deserialize_shard_info(r_cast< const char* >(value.cbytes()), value.size());
// TODO:: for seal_shard, what if we pre_commit successfully , but commit failed(e.g., log header corrupted) ?
auto shard_info = deserialize_shard_info(r_cast< const char* >(value.bytes()), value.size());
switch (header.msg_type) {
case ReplicationMessageType::CREATE_SHARD_MSG: {
bool shard_exist = false;
Expand All @@ -215,7 +315,6 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader

break;
}

case ReplicationMessageType::SEAL_SHARD_MSG: {
ShardInfo::State state;
{
Expand All @@ -225,21 +324,20 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader
state = (*iter->second)->info.state;
}

if (state == ShardInfo::State::OPEN) {
auto chunk_id = get_shard_chunk(shard_info.id);
RELEASE_ASSERT(chunk_id.has_value(), "Chunk id not found");
chunk_selector()->release_chunk(chunk_id.value());
update_shard_in_map(shard_info);
}
RELEASE_ASSERT(state == ShardInfo::State::SEALED, "Shard should be sealed before commit");

break;
auto chunk_id = get_shard_chunk(shard_info.id);
RELEASE_ASSERT(chunk_id.has_value(), "Chunk id not found");
// when restarting, the chunk is not selected since the metablk indicates the shard is sealed.
// handle this in chunk selector
chunk_selector()->release_chunk(chunk_id.value());
}
default: {
break;
}
}

if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }
if (!ctxFullfilled) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }
}

void HSHomeObject::add_new_shard_to_map(ShardPtr&& shard) {
Expand Down
Loading

0 comments on commit f3a4ed9

Please sign in to comment.