diff --git a/src/lib/homestore/shard_manager.cpp b/src/lib/homestore/shard_manager.cpp index 9e68480b..be2b1f84 100644 --- a/src/lib/homestore/shard_manager.cpp +++ b/src/lib/homestore/shard_manager.cpp @@ -70,7 +70,7 @@ ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id pg_owner, ui auto replica_set_var = _repl_svc->get_replica_set(fmt::format("{}", pg_owner)); if (std::holds_alternative< home_replication:: ReplServiceError>(replica_set_var)) { LOGWARN("failed to get replica set instance for pg [{}]", pg_owner); - return folly::makeUnexpected(ShardError::UNKNOWN_PG); + return folly::makeUnexpected(ShardError::UNKNOWN_PG); } auto replica_set = std::get< home_replication::rs_ptr_t >(replica_set_var); @@ -127,7 +127,7 @@ bool HSHomeObject::precheck_and_decode_shard_msg(int64_t lsn, sisl::blob const& auto crc = crc32_ieee(init_crc32, r_cast< const uint8_t* >(shard_msg.c_str()), shard_msg.size()); if (msg_header->payload_crc != crc) { LOGWARN("replication message body is corrupted with crc error, lsn:{}", lsn); - return false; + return false; } *msg = std::move(shard_msg); return true; @@ -213,6 +213,7 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header if (promise) { promise->setValue(folly::makeUnexpected(ShardError::INVALID_ARG)); } + return; } auto shard = deserialize_shard(shard_msg); diff --git a/src/lib/homestore/tests/test_shard_manager.cpp b/src/lib/homestore/tests/test_shard_manager.cpp index 876af24e..f6dd9dc4 100644 --- a/src/lib/homestore/tests/test_shard_manager.cpp +++ b/src/lib/homestore/tests/test_shard_manager.cpp @@ -12,6 +12,7 @@ #define protected public #include "lib/homestore/homeobject.hpp" +#include "lib/homestore/replication_message.hpp" #include "lib/homestore/replication_state_machine.hpp" #include "mocks/mock_replica_set.hpp" @@ -158,7 +159,6 @@ TEST_F(ShardManagerWithShardsTesting, GetKnownShard) { ASSERT_TRUE(!!e); ShardInfo shard_info = e.value(); auto future = _home_object->shard_manager()->get_shard(shard_info.id).get(); - ASSERT_TRUE(!!e); future.then([this, shard_info](auto const& info) { EXPECT_TRUE(info.id == shard_info.id); EXPECT_TRUE(info.placement_group == _pg_id); @@ -180,6 +180,145 @@ TEST_F(ShardManagerWithShardsTesting, ListShards) { }); } +TEST_F(ShardManagerWithShardsTesting, SealUnknownShard) { + EXPECT_EQ(ShardError::UNKNOWN_SHARD, _home_object->shard_manager()->seal_shard(1000).get().error()); +} + +TEST_F(ShardManagerWithShardsTesting, MockSealShard) { + auto e = _home_object->shard_manager()->create_shard(_pg_id, Mi).get(); + ASSERT_TRUE(!!e); + ShardInfo shard_info = e.value(); + auto shard = homeobject::Shard(shard_info); + shard.info.state = ShardInfo::State::SEALED; + nlohmann::json j; + j["shard_info"]["shard_id"] = shard.info.id; + j["shard_info"]["pg_id"] = shard.info.placement_group; + j["shard_info"]["state"] = shard.info.state; + j["shard_info"]["created_time"] = shard.info.created_time; + j["shard_info"]["modified_time"] = shard.info.last_modified_time; + j["shard_info"]["total_capacity"] = shard.info.total_capacity_bytes; + j["shard_info"]["available_capacity"] = shard.info.available_capacity_bytes; + j["shard_info"]["deleted_capacity"] = shard.info.deleted_capacity_bytes; + j["ext_info"]["chunk_id"] = shard.chunk_id; + auto seal_shard_msg = j.dump(); + const uint32_t needed_size = sizeof(homeobject::ReplicationMessageHeader) + seal_shard_msg.size(); + auto buf = nuraft::buffer::alloc(needed_size); + uint8_t* raw_ptr = buf->data_begin(); + homeobject::ReplicationMessageHeader *header = new(raw_ptr) homeobject::ReplicationMessageHeader(); + header->message_type = homeobject::ReplicationMessageType::SHARD_MESSAGE; + header->payload_size = seal_shard_msg.size(); + uint32_t expected_crc = crc32_ieee(homeobject::init_crc32, r_cast< const uint8_t* >(seal_shard_msg.c_str()), seal_shard_msg.size()); + + header->payload_crc = expected_crc; + header->header_crc = header->calculate_crc(); + ++header->header_crc; + raw_ptr += sizeof(homeobject::ReplicationMessageHeader); + std::memcpy(raw_ptr, seal_shard_msg.c_str(), seal_shard_msg.size()); + homeobject::HSHomeObject* ho = dynamic_cast(_home_object.get()); + ho->on_pre_commit_shard_msg(100, sisl::blob(buf->data_begin(), needed_size), sisl::blob(), nullptr); + ho->on_shard_message_commit(100, sisl::blob(buf->data_begin(), needed_size), sisl::blob(), nullptr); + //nothing will happen and shard state is still OPEN; + auto future = _home_object->shard_manager()->get_shard(shard_info.id).get(); + ASSERT_TRUE(!!future); + future.then([this, shard_info](auto const& info) { + EXPECT_TRUE(info.id == shard_info.id); + EXPECT_TRUE(info.placement_group == _pg_id); + EXPECT_EQ(info.state, ShardInfo::State::OPEN); + }); + + //given a wrong body crc; + header->payload_crc = expected_crc + 1; + header->header_crc = header->calculate_crc(); + ho->on_pre_commit_shard_msg(100, sisl::blob(buf->data_begin(), needed_size), sisl::blob(), nullptr); + ho->on_shard_message_commit(100, sisl::blob(buf->data_begin(), needed_size), sisl::blob(), nullptr); + future = _home_object->shard_manager()->get_shard(shard_info.id).get(); + ASSERT_TRUE(!!future); + future.then([this, shard_info](auto const& info) { + EXPECT_TRUE(info.id == shard_info.id); + EXPECT_TRUE(info.placement_group == _pg_id); + EXPECT_EQ(info.state, ShardInfo::State::OPEN); + }); + + //everything is fine but it is rollbacked; + header->payload_crc = expected_crc; + header->header_crc = header->calculate_crc(); + ho->on_pre_commit_shard_msg(100, sisl::blob(buf->data_begin(), needed_size), sisl::blob(), nullptr); + ho->on_rollback_shard_msg(100, sisl::blob(buf->data_begin(), needed_size), sisl::blob(), nullptr); + future = _home_object->shard_manager()->get_shard(shard_info.id).get(); + ASSERT_TRUE(!!future); + future.then([this, shard_info](auto const& info) { + EXPECT_TRUE(info.id == shard_info.id); + EXPECT_TRUE(info.placement_group == _pg_id); + EXPECT_EQ(info.state, ShardInfo::State::OPEN); + }); + + //everything is fine; + ho->on_pre_commit_shard_msg(100, sisl::blob(buf->data_begin(), needed_size), sisl::blob(), nullptr); + ho->on_shard_message_commit(100, sisl::blob(buf->data_begin(), needed_size), sisl::blob(), nullptr); + future = _home_object->shard_manager()->get_shard(shard_info.id).get(); + ASSERT_TRUE(!!future); + future.then([this, shard_info](auto const& info) { + EXPECT_TRUE(info.id == shard_info.id); + EXPECT_TRUE(info.placement_group == _pg_id); + EXPECT_EQ(info.state, ShardInfo::State::SEALED); + }); + + auto pg_iter = ho->_pg_map.find(_pg_id); + EXPECT_TRUE(pg_iter != ho->_pg_map.end()); + auto& pg = pg_iter->second; + EXPECT_EQ(1, pg.shards.size()); + auto& check_shard = *pg.shards.begin(); + EXPECT_EQ(ShardInfo::State::SEALED, check_shard.info.state); + EXPECT_TRUE(check_shard.metablk_cookie != nullptr); +} + +TEST_F(ShardManagerWithShardsTesting, ShardManagerRecovery) { + auto e = _home_object->shard_manager()->create_shard(_pg_id, Mi).get(); + ASSERT_TRUE(!!e); + ShardInfo shard_info = e.value(); + EXPECT_EQ(ShardInfo::State::OPEN, shard_info.state); + EXPECT_EQ(Mi, shard_info.total_capacity_bytes); + EXPECT_EQ(Mi, shard_info.available_capacity_bytes); + EXPECT_EQ(0ul, shard_info.deleted_capacity_bytes); + EXPECT_EQ(_pg_id, shard_info.placement_group); + + nlohmann::json shard_json; + shard_json["shard_info"]["shard_id"] = shard_info.id; + shard_json["shard_info"]["pg_id"] = shard_info.placement_group; + shard_json["shard_info"]["state"] = shard_info.state; + shard_json["shard_info"]["created_time"] = shard_info.created_time; + shard_json["shard_info"]["modified_time"] = shard_info.last_modified_time; + shard_json["shard_info"]["total_capacity"] = shard_info.total_capacity_bytes; + shard_json["shard_info"]["available_capacity"] = shard_info.available_capacity_bytes; + shard_json["shard_info"]["deleted_capacity"] = shard_info.deleted_capacity_bytes; + shard_json["ext_info"]["chunk_id"] = 100; + auto shard_msg = shard_json.dump(); + + //Manual remove shard info from home_object and relay on metablk service to replay it back; + homeobject::HSHomeObject* ho = dynamic_cast(_home_object.get()); + auto pg_iter = ho->_pg_map.find(_pg_id); + EXPECT_TRUE(pg_iter != ho->_pg_map.end()); + auto& pg = pg_iter->second; + EXPECT_EQ(1, pg.shards.size()); + auto& check_shard = *pg.shards.begin(); + void* saved_metablk = check_shard.metablk_cookie; + pg_iter->second.shards.clear(); + ho->_shard_map.clear(); + EXPECT_EQ(ShardError::UNKNOWN_SHARD, _home_object->shard_manager()->get_shard(_shard_id).get().error()); + + auto buf = sisl::make_byte_array(static_cast< uint32_t >(shard_msg.size()), 0, sisl::buftag::metablk); + std::memcpy(buf->bytes, shard_msg.c_str(), shard_msg.size()); + ho->on_shard_meta_blk_found(static_cast(saved_metablk), buf, shard_msg.size()); + //check the recover result; + auto future = _home_object->shard_manager()->get_shard(shard_info.id).get(); + EXPECT_TRUE(!!future); + future.then([this, shard_info](auto const& info) { + EXPECT_TRUE(info.id == shard_info.id); + EXPECT_TRUE(info.placement_group == _pg_id); + EXPECT_EQ(info.state, ShardInfo::State::OPEN); + }); +} + int main(int argc, char* argv[]) { int parsed_argc = argc; ::testing::InitGoogleTest(&parsed_argc, argv);