diff --git a/conanfile.py b/conanfile.py index 337a7cc..93e9ccd 100644 --- a/conanfile.py +++ b/conanfile.py @@ -49,7 +49,7 @@ def build_requirements(self): def requirements(self): self.requires("sisl/[^12.2]@oss/master", transitive_headers=True) - self.requires("homestore/[^6.4]@oss/master") + self.requires("homestore/6.5.19@oss/master") self.requires("iomgr/[^11.3]@oss/master") self.requires("lz4/1.9.4", override=True) self.requires("openssl/3.3.1", override=True) diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 4aabff1..4f3be36 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -328,8 +328,8 @@ class HSHomeObject : public HomeObjectImpl { std::vector< ShardEntry > shard_list_{0}; - objId cur_obj_id_{1, 0}; - uint64_t cur_shard_idx_{0}; + objId cur_obj_id_{0, 0}; + int64_t cur_shard_idx_{-1}; std::vector cur_blob_list_{0}; uint64_t cur_start_blob_idx_{0}; uint64_t cur_batch_blob_count_{0}; diff --git a/src/lib/homestore_backend/pg_blob_iterator.cpp b/src/lib/homestore_backend/pg_blob_iterator.cpp index 8ffd0dd..f0afa8e 100644 --- a/src/lib/homestore_backend/pg_blob_iterator.cpp +++ b/src/lib/homestore_backend/pg_blob_iterator.cpp @@ -64,7 +64,7 @@ objId HSHomeObject::PGBlobIterator::expected_next_obj_id() { return objId(cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id + 1); } //next shard - if (cur_shard_idx_ < shard_list_.size() - 1) { + if (cur_shard_idx_ < static_cast< int64_t >(shard_list_.size() - 1)) { auto next_shard_seq_num = shard_list_[cur_shard_idx_ + 1].info.id & 0xFFFFFFFFFFFF; return objId(next_shard_seq_num, 0); } @@ -103,7 +103,7 @@ bool HSHomeObject::PGBlobIterator::create_pg_snapshot_data(sisl::io_blob_safe& m shard_ids.push_back(shard.info.id); } - auto pg_entry = CreateResyncPGMetaDataDirect(builder_, pg_info.id, &uuid, pg->durable_entities().blob_sequence_num, + auto pg_entry = CreateResyncPGMetaDataDirect(builder_, pg_info.id, &uuid, pg_info.size, pg_info.chunk_size, pg->durable_entities().blob_sequence_num, pg->shard_sequence_num_, &members, &shard_ids); builder_.FinishSizePrefixed(pg_entry); @@ -212,7 +212,7 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe } sisl::io_blob_safe blob; - auto retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry); + uint8_t retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry); for (int i = 0; i < retries; i++) { auto result = load_blob_data(info, state).get(); if (result.hasError() && result.error().code == BlobErrorCode::READ_FAILED) { @@ -241,7 +241,7 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe builder_.FinishSizePrefixed( CreateResyncBlobDataBatchDirect(builder_, &blob_entries, end_of_shard)); - LOGD("create blobs snapshot data batch: shard_id={}, batch_num={}, total_bytes={}, blob_num={}, end_of_shard={}", + LOGD("create blobs snapshot data batch: shard_seq_num={}, batch_num={}, total_bytes={}, blob_num={}, end_of_shard={}", cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id, total_bytes, blob_entries.size(), end_of_shard); pack_resync_message(data_blob, SyncMessageType::SHARD_BATCH); diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index 3ad72c6..6116aa8 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -270,6 +270,7 @@ int ReplicationStateMachine::read_snapshot_obj(std::shared_ptr< homestore::snaps auto obj_id = objId(snp_obj->offset); log_str = fmt::format("{} shard_seq_num={} batch_num={}", log_str, obj_id.shard_seq_num, obj_id.batch_id); + LOGD("read current snp obj {}", log_str) //invalid Id if (!pg_iter->update_cursor(obj_id)) { LOGW("Invalid objId in snapshot read, {}, current shard_seq_num={}, current batch_num={}", @@ -350,7 +351,8 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna // Check if the snapshot context is same as the current snapshot context. // If not, drop the previous context and re-init a new one if (m_snp_rcv_handler->get_context_lsn() != context->get_lsn()) { - m_snp_rcv_handler->reset_context(pg_data->pg_id(), context->get_lsn()); + LOGI("reset context from lsn:{} to lsn:{}", m_snp_rcv_handler->get_context_lsn(), context->get_lsn()); + m_snp_rcv_handler->reset_context(context->get_lsn(), pg_data->pg_id()); // TODO: Reset all data of current PG - let's resync on a pristine base } diff --git a/src/lib/homestore_backend/resync_pg_data.fbs b/src/lib/homestore_backend/resync_pg_data.fbs index 3d28097..bf7d059 100644 --- a/src/lib/homestore_backend/resync_pg_data.fbs +++ b/src/lib/homestore_backend/resync_pg_data.fbs @@ -11,6 +11,8 @@ table Member { table ResyncPGMetaData { pg_id : uint16; // only low 16 bit is used for pg_id; replica_set_uuid : [ubyte]; // uuid of replica set + pg_size : uint64; // pg size; + chunk_size : uint64; // chunk size; blob_seq_num : uint64; // blob sequence number, used to assign next blob id; shard_seq_num: uint64; // shard sequence number, used to assign next shard id; members : [Member]; // peers; diff --git a/src/lib/homestore_backend/snapshot_receive_handler.cpp b/src/lib/homestore_backend/snapshot_receive_handler.cpp index 0544cc9..195f7d0 100644 --- a/src/lib/homestore_backend/snapshot_receive_handler.cpp +++ b/src/lib/homestore_backend/snapshot_receive_handler.cpp @@ -21,6 +21,8 @@ int HSHomeObject::SnapshotReceiveHandler::process_pg_snapshot_data(ResyncPGMetaD // Create local PG PGInfo pg_info(pg_meta.pg_id()); + pg_info.size = pg_meta.pg_size(); + pg_info.chunk_size = pg_meta.chunk_size(); std::copy_n(pg_meta.replica_set_uuid()->data(), 16, pg_info.replica_set_uuid.begin()); for (unsigned int i = 0; i < pg_meta.members()->size(); i++) { const auto member = pg_meta.members()->Get(i); diff --git a/src/lib/homestore_backend/tests/hs_blob_tests.cpp b/src/lib/homestore_backend/tests/hs_blob_tests.cpp index 958aa3a..4dd417b 100644 --- a/src/lib/homestore_backend/tests/hs_blob_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_blob_tests.cpp @@ -196,6 +196,8 @@ TEST_F(HomeObjectFixture, PGBlobIterator) { auto u1 = pg_msg->replica_set_uuid(); auto u2 = pg->pg_info_.replica_set_uuid; ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end())); + ASSERT_EQ(pg_msg->pg_size(), pg->pg_info_.size); + ASSERT_EQ(pg_msg->chunk_size(), pg->pg_info_.chunk_size); ASSERT_EQ(pg_msg->blob_seq_num(), pg->durable_entities().blob_sequence_num.load()); ASSERT_EQ(pg_msg->shard_seq_num(), pg->shard_sequence_num_); @@ -300,6 +302,9 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { // We have to create a PG first to init repl_dev constexpr pg_id_t pg_id = 1; create_pg(pg_id); // to create repl dev + auto iter = _obj_inst->_pg_map.find(pg_id); + ASSERT_TRUE(iter != _obj_inst->_pg_map.end()); + auto pg = iter->second.get(); PGStats stats; ASSERT_TRUE(_obj_inst->pg_manager()->get_stats(pg_id, stats)); auto r_dev = homestore::HomeStore::instance()->repl_service().get_repl_dev(stats.replica_set_uuid); @@ -324,7 +329,7 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { shard_ids.push_back(i); } auto pg_entry = - CreateResyncPGMetaDataDirect(builder, pg_id, &uuid, blob_seq_num, num_shards_per_pg, &members, &shard_ids); + CreateResyncPGMetaDataDirect(builder, pg_id, &uuid, pg->pg_info_.size, pg->pg_info_.chunk_size, blob_seq_num, num_shards_per_pg, &members, &shard_ids); builder.Finish(pg_entry); auto pg_meta = GetResyncPGMetaData(builder.GetBufferPointer()); auto ret = handler->process_pg_snapshot_data(*pg_meta);