diff --git a/src/include/homeobject/common.hpp b/src/include/homeobject/common.hpp index 10dd1f52..830ffedc 100644 --- a/src/include/homeobject/common.hpp +++ b/src/include/homeobject/common.hpp @@ -32,6 +32,8 @@ using peer_id = boost::uuids::uuid; using pg_id = uint16_t; using shard_id = uint64_t; +static constexpr uint32_t init_crc32 = 0; + template < class E > class Manager { public: diff --git a/src/include/homeobject/pg_manager.hpp b/src/include/homeobject/pg_manager.hpp index 52a16bd6..47f6372a 100644 --- a/src/include/homeobject/pg_manager.hpp +++ b/src/include/homeobject/pg_manager.hpp @@ -31,7 +31,7 @@ struct PGInfo { struct PG { explicit PG(PGInfo info) : pg_info(std::move(info)) {} PGInfo pg_info; - uint64_t next_sequence_num_for_new_shard{0}; + uint64_t shard_sequence_num{0}; CShardInfoList shards; }; diff --git a/src/lib/homeobject_impl.hpp b/src/lib/homeobject_impl.hpp index 6f39be2c..b05f363a 100644 --- a/src/lib/homeobject_impl.hpp +++ b/src/lib/homeobject_impl.hpp @@ -80,6 +80,7 @@ class HomeObjectImpl : public HomeObject, ShardManager::AsyncResult< ShardInfo > create_shard(pg_id pg_owner, uint64_t size_bytes) final; ShardManager::AsyncResult< InfoList > list_shards(pg_id pg) const final; ShardManager::AsyncResult< ShardInfo > seal_shard(shard_id id) final; + uint64_t get_current_timestamp(); static const std::string s_shard_info_sub_type; /// BlobManager diff --git a/src/lib/homestore/homeobject.cpp b/src/lib/homestore/homeobject.cpp index c0bde80d..2fa74a9b 100644 --- a/src/lib/homestore/homeobject.cpp +++ b/src/lib/homestore/homeobject.cpp @@ -36,30 +36,15 @@ HSHomeObject::HSHomeObject(std::weak_ptr< HomeObjectApplication >&& application) } void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { - const char* shard_info_json_str = r_cast(buf.bytes()); - auto shard_info_json = nlohmann::json::parse(shard_info_json_str, shard_info_json_str + size); + std::string shard_info_str; + shard_info_str.append(r_cast(buf.bytes()), size); - CShardInfo shard_info = std::make_shared(); - shard_info->id = shard_info_json["shard_id"].get(); - shard_info->placement_group = shard_info_json["pg_id"].get(); - shard_info->state = static_cast(shard_info_json["state"].get()); - shard_info->total_capacity_bytes = shard_info_json["capacity"].get(); - - std::scoped_lock lock_guard(_pg_lock, _shard_lock); + CShardInfo shard_info = deserialize_shard_info(shard_info_str); if (shard_info->state == ShardInfo::State::OPEN) { // create shard; - auto pg_iter = _pg_map.find(shard_info->placement_group); - RELEASE_ASSERT(pg_iter != _pg_map.end(), "Missing PG info"); - pg_iter->second.shards.push_back(shard_info); - auto sequence_num = get_sequence_num_from_shard_id(shard_info->id); - if (sequence_num > pg_iter->second.next_sequence_num_for_new_shard) { - pg_iter->second.next_sequence_num_for_new_shard = sequence_num; - } - _shard_map[shard_info->id] = shard_info; + do_commit_new_shard(shard_info); } else { - auto shard_iter = _shard_map.find(shard_info->id); - RELEASE_ASSERT(shard_iter != _shard_map.end(), "Missing shard info"); - shard_iter->second = shard_info; + do_commit_seal_shard(shard_info); } } diff --git a/src/lib/homestore/homeobject.hpp b/src/lib/homestore/homeobject.hpp index d0016b67..94c55fa2 100644 --- a/src/lib/homestore/homeobject.hpp +++ b/src/lib/homestore/homeobject.hpp @@ -29,14 +29,17 @@ class HSHomeObject : public HomeObjectImpl { shard_id generate_new_shard_id(pg_id pg); shard_id make_new_shard_id(pg_id pg, uint64_t sequence_num) const; uint64_t get_sequence_num_from_shard_id(uint64_t shard_id); - std::string prepare_create_shard_message(pg_id pg, shard_id new_shard_id, uint64_t shard_size) const; + std::string serialize_shard_info(CShardInfo shard_info) const; + CShardInfo deserialize_shard_info(const std::string& shard_info_str) const; + void do_commit_new_shard(CShardInfo shard_info); + void do_commit_seal_shard(CShardInfo shard_info); public: using HomeObjectImpl::HomeObjectImpl; HSHomeObject(std::weak_ptr< HomeObjectApplication >&& application); ~HSHomeObject() override = default; void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size); void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, - sisl::sg_list const& value, void* user_ctx); + void* user_ctx); }; } // namespace homeobject diff --git a/src/lib/homestore/replication_message.hpp b/src/lib/homestore/replication_message.hpp index d937cbe4..d8784780 100644 --- a/src/lib/homestore/replication_message.hpp +++ b/src/lib/homestore/replication_message.hpp @@ -1,5 +1,7 @@ #pragma once +#include "homeobject/common.hpp" + #include #include @@ -9,16 +11,18 @@ ENUM(ReplicationMessageType, uint16_t, PG_MESSAGE = 0, SHARD_MESSAGE, BLOB_MESSA // magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum' static constexpr uint64_t HOMEOBJECT_REPLICATION_MAGIC = 0x11153ca24efc8d34; +static constexpr uint32_t HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1 = 0x01; struct ReplicationMessageHeader { uint64_t magic_num{HOMEOBJECT_REPLICATION_MAGIC}; + uint32_t protocol_version{HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1}; ReplicationMessageType message_type; - uint32_t message_size; - uint32_t message_crc; - uint8_t reserved_pad[2]{}; + uint32_t payload_size; + uint32_t payload_crc; + uint8_t reserved_pad[6]{}; uint32_t header_crc; uint32_t calculate_crc() const { - return crc32_ieee(0, r_cast(this), sizeof(*this) - sizeof(header_crc)); + return crc32_ieee(init_crc32, r_cast(this), sizeof(*this) - sizeof(header_crc)); } }; diff --git a/src/lib/homestore/replication_state_machine.cpp b/src/lib/homestore/replication_state_machine.cpp index 623af65a..fd8b83e4 100644 --- a/src/lib/homestore/replication_state_machine.cpp +++ b/src/lib/homestore/replication_state_machine.cpp @@ -5,13 +5,12 @@ namespace homeobject { void ReplicationStateMachine::on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, blkid_list_t const& blkids,void* ctx) { + LOGINFO("applying raft log commit with lsn:{}", lsn); const ReplicationMessageHeader* msg_header = r_cast(header.bytes); - //TODO: read data from pba or get data from replication context(prefered) - sisl::sg_list value; switch (msg_header->message_type) { case SHARD_MESSAGE: { - home_object->on_shard_message_commit(lsn, header, key, value, ctx); + home_object->on_shard_message_commit(lsn, header, key,ctx); } case PG_MESSAGE: case BLOB_MESSAGE: diff --git a/src/lib/homestore/shard_manager.cpp b/src/lib/homestore/shard_manager.cpp index a9734314..f12c422e 100644 --- a/src/lib/homestore/shard_manager.cpp +++ b/src/lib/homestore/shard_manager.cpp @@ -22,7 +22,7 @@ shard_id HSHomeObject::generate_new_shard_id(pg_id pg) { auto created_pg = PG(std::move(PGInfo(pg))); iter = _pg_map.insert(std::make_pair(pg, std::move(created_pg))).first; } - auto new_sequence_num = ++iter->second.next_sequence_num_for_new_shard; + auto new_sequence_num = ++iter->second.shard_sequence_num; RELEASE_ASSERT(new_sequence_num < ShardManager::max_shard_num_in_pg(), "new shard id must be less than ShardManager::max_shard_num_in_pg()"); return make_new_shard_id(pg, new_sequence_num); } @@ -34,15 +34,34 @@ shard_id HSHomeObject::make_new_shard_id(pg_id pg, uint64_t sequence_num) const uint64_t HSHomeObject::get_sequence_num_from_shard_id(uint64_t shard_id) { return shard_id & (0x0000FFFFFFFFFFFF); } -std::string HSHomeObject::prepare_create_shard_message(pg_id pg, shard_id new_shard_id, uint64_t shard_size) const { + +std::string HSHomeObject::serialize_shard_info(CShardInfo shard_info) const { nlohmann::json j; - j["pg_id"] = pg; - j["shard_id"] = new_shard_id; - j["state"] = ShardInfo::State::OPEN; - j["capacity"] = shard_size; + j["shard_id"] = shard_info->id; + j["pg_id"] = shard_info->placement_group; + j["state"] = shard_info->state; + j["created_time"] = shard_info->created_time; + j["modified_time"] = shard_info->last_modified_time; + j["total_capacity"] = shard_info->total_capacity_bytes; + j["available_capacity"] = shard_info->available_capacity_bytes; + j["deleted_capacity"] = shard_info->deleted_capacity_bytes; return j.dump(); } +CShardInfo HSHomeObject:: deserialize_shard_info(const std::string& shard_info_str) const { + auto shard_info_json = nlohmann::json::parse(shard_info_str); + CShardInfo shard_info = std::make_shared(); + shard_info->id = shard_info_json["shard_id"].get(); + shard_info->placement_group = shard_info_json["pg_id"].get(); + shard_info->state = static_cast(shard_info_json["state"].get()); + shard_info->created_time = shard_info_json["created_time"].get(); + shard_info->last_modified_time = shard_info_json["modified_time"].get(); + shard_info->available_capacity_bytes = shard_info_json["available_capacity"].get(); + shard_info->total_capacity_bytes = shard_info_json["total_capacity"].get(); + shard_info->deleted_capacity_bytes = shard_info_json["deleted_capacity"].get(); + return shard_info; +} + ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id pg_owner, uint64_t size_bytes) { bool pg_exist = check_if_pg_exist(pg_owner); if (!pg_exist) { @@ -62,19 +81,26 @@ ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id pg_owner, ui } auto new_shard_id = generate_new_shard_id(pg_owner); - std::string create_shard_message = prepare_create_shard_message(pg_owner, new_shard_id, size_bytes); - ReplicationMessageHeader header; - header.message_type = ReplicationMessageType::SHARD_MESSAGE; - header.message_size = create_shard_message.size(); - header.message_crc = crc32_ieee(0, r_cast< const uint8_t* >(create_shard_message.c_str()), create_shard_message.size()); - header.header_crc = header.calculate_crc(); + auto create_time = get_current_timestamp(); + auto shard_info = std::make_shared(ShardInfo(new_shard_id, pg_owner, ShardInfo::State::OPEN, + create_time, create_time, size_bytes, size_bytes, 0)); + std::string create_shard_message = serialize_shard_info(shard_info); + //preapre msg header; + const uint32_t needed_size = sizeof(ReplicationMessageHeader) + create_shard_message.size(); + auto buf = nuraft::buffer::alloc(needed_size); + uint8_t* raw_ptr = static_cast(buf->data_begin()); + ReplicationMessageHeader *header = new(raw_ptr) ReplicationMessageHeader(); + header->message_type = ReplicationMessageType::SHARD_MESSAGE; + header->payload_size = create_shard_message.size(); + header->payload_crc = crc32_ieee(init_crc32, r_cast< const uint8_t* >(create_shard_message.c_str()), create_shard_message.size()); + header->header_crc = header->calculate_crc(); + raw_ptr += sizeof(ReplicationMessageHeader); + std::memcpy(raw_ptr, create_shard_message.c_str(), create_shard_message.size()); + sisl::sg_list value; - value.size = create_shard_message.size(); - value.iovs.emplace_back(iovec{static_cast(const_cast(create_shard_message.c_str())), - create_shard_message.size()}); //replicate this create shard message to PG members; auto [p, sf] = folly::makePromiseContract< ShardManager::Result >(); - repl_dev->async_alloc_write(sisl::blob(uintptr_cast(&header), sizeof(header)), + repl_dev->async_alloc_write(sisl::blob(buf->data_begin(), needed_size), sisl::blob(), value, static_cast(&p)); @@ -82,6 +108,7 @@ ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id pg_owner, ui if (!bool(info)) { LOGWARN("create new shard [{}] on pg [{}] is failed with error:{}", new_shard_id, pg_owner, info.error()); } + header->~ReplicationMessageHeader(); return info; } @@ -98,7 +125,7 @@ ShardManager::Result< InfoList > HSHomeObject::_list_shards(pg_id pg) const { } void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, - sisl::sg_list const& value, void* user_ctx) { + void* user_ctx) { const ReplicationMessageHeader* msg_header = r_cast(header.bytes); folly::Promise > *promise = nullptr; // user_ctx will be nullptr when: @@ -117,12 +144,10 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header } std::string string_value; - for (auto & iovec: value.iovs) { - string_value.append(static_cast(iovec.iov_base), iovec.iov_len); - } + string_value.append(r_cast(header.bytes + sizeof(ReplicationMessageHeader)), msg_header->payload_size); - auto crc = crc32_ieee(0, r_cast< const uint8_t* >(string_value.c_str()), string_value.size()); - if (msg_header->message_crc != crc) { + auto crc = crc32_ieee(init_crc32, r_cast< const uint8_t* >(string_value.c_str()), string_value.size()); + if (msg_header->payload_crc != crc) { LOGWARN("replication message body is corrupted with crc error, lsn:{}", lsn); if (promise) { promise->setValue(folly::makeUnexpected(ShardError::INVALID_ARG)); @@ -130,26 +155,43 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header return; } - //preapre ShardInfoExt msg and persisted this ShardInfoExt to homestore MetaBlkService; + //TODO: preapre ShardInfoExt msg with chunk_id and persisted the ShardInfoExt to homestore MetaBlkService; void* cookie = nullptr; homestore::hs()->meta_service().add_sub_sb(HomeObjectImpl::s_shard_info_sub_type, r_cast(string_value.c_str()), string_value.size(), cookie); //update in-memory shard map; - CShardInfo shard_info = std::make_shared(); - auto shard_info_json = nlohmann::json::parse(string_value); - shard_info->id = shard_info_json["shard_id"].get(); - shard_info->placement_group = shard_info_json["pg_id"].get(); - shard_info->state = static_cast(shard_info_json["state"].get()); - shard_info->total_capacity_bytes = shard_info_json["capacity"].get(); - std::scoped_lock lock_guard(_pg_lock, _shard_lock); + CShardInfo shard_info = deserialize_shard_info(string_value); + if (shard_info->state == ShardInfo::State::OPEN) { + //create shard; + do_commit_new_shard(shard_info); + } else { + do_commit_seal_shard(shard_info); + } + + if (promise) { + promise->setValue(ShardManager::Result(*shard_info)); + } +} + +void HSHomeObject::do_commit_new_shard(CShardInfo shard_info) { + std::scoped_lock lock_guard(_pg_lock, _shard_lock); auto pg_iter = _pg_map.find(shard_info->placement_group); RELEASE_ASSERT(pg_iter != _pg_map.end(), "Missing PG info"); pg_iter->second.shards.push_back(shard_info); - _shard_map[shard_info->id] = shard_info; - if (promise) { - promise->setValue(ShardManager::Result(*shard_info)); + //following part is must for follower members or when member is restarted; + auto sequence_num = get_sequence_num_from_shard_id(shard_info->id); + if (sequence_num > pg_iter->second.shard_sequence_num) { + pg_iter->second.shard_sequence_num = sequence_num; } + _shard_map[shard_info->id] = shard_info; +} + +void HSHomeObject::do_commit_seal_shard(CShardInfo shard_info) { + std::scoped_lock lock_guard(_shard_lock); + auto shard_iter = _shard_map.find(shard_info->id); + RELEASE_ASSERT(shard_iter != _shard_map.end(), "Missing shard info"); + *(shard_iter->second) = *shard_info; } } // namespace homeobject diff --git a/src/lib/memory/shard_manager.cpp b/src/lib/memory/shard_manager.cpp index b6403480..a30ddb6f 100644 --- a/src/lib/memory/shard_manager.cpp +++ b/src/lib/memory/shard_manager.cpp @@ -6,13 +6,6 @@ namespace homeobject { uint64_t ShardManager::max_shard_size() { return Gi; } -static uint64_t get_current_timestamp() { - auto now = std::chrono::system_clock::now(); - auto duration = std::chrono::duration_cast< std::chrono::milliseconds >(now.time_since_epoch()); - auto timestamp = static_cast< uint64_t >(duration.count()); - return timestamp; -} - ShardManager::Result< ShardInfo > MemoryHomeObject::_create_shard(pg_id pg_owner, uint64_t size_bytes) { auto lg = std::scoped_lock(_pg_lock, _shard_lock); auto pg_it = _pg_map.find(pg_owner); diff --git a/src/lib/shard_manager.cpp b/src/lib/shard_manager.cpp index c46b7e45..b627fa61 100644 --- a/src/lib/shard_manager.cpp +++ b/src/lib/shard_manager.cpp @@ -31,4 +31,11 @@ ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::seal_shard(shard_id id) { ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::get_shard(shard_id id) const { return _get_shard(id); } +uint64_t HomeObjectImpl::get_current_timestamp() { + auto now = std::chrono::system_clock::now(); + auto duration = std::chrono::duration_cast< std::chrono::milliseconds >(now.time_since_epoch()); + auto timestamp = static_cast< uint64_t >(duration.count()); + return timestamp; +} + } // namespace homeobject