Skip to content

Commit

Permalink
Fix some bugs in baseline_resync
Browse files Browse the repository at this point in the history
  • Loading branch information
yuwmao committed Dec 9, 2024
1 parent b9e8d79 commit fd081cc
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 9 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def build_requirements(self):

def requirements(self):
self.requires("sisl/[^12.2]@oss/master", transitive_headers=True)
self.requires("homestore/[^6.4]@oss/master")
self.requires("homestore/6.5.19@oss/master")
self.requires("iomgr/[^11.3]@oss/master")
self.requires("lz4/1.9.4", override=True)
self.requires("openssl/3.3.1", override=True)
Expand Down
4 changes: 2 additions & 2 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ class HSHomeObject : public HomeObjectImpl {

std::vector< ShardEntry > shard_list_{0};

objId cur_obj_id_{1, 0};
uint64_t cur_shard_idx_{0};
objId cur_obj_id_{0, 0};
int64_t cur_shard_idx_{-1};
std::vector<BlobInfo> cur_blob_list_{0};
uint64_t cur_start_blob_idx_{0};
uint64_t cur_batch_blob_count_{0};
Expand Down
8 changes: 4 additions & 4 deletions src/lib/homestore_backend/pg_blob_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ objId HSHomeObject::PGBlobIterator::expected_next_obj_id() {
return objId(cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id + 1);
}
//next shard
if (cur_shard_idx_ < shard_list_.size() - 1) {
if (cur_shard_idx_ < static_cast< int64_t >(shard_list_.size() - 1)) {
auto next_shard_seq_num = shard_list_[cur_shard_idx_ + 1].info.id & 0xFFFFFFFFFFFF;
return objId(next_shard_seq_num, 0);
}
Expand Down Expand Up @@ -103,7 +103,7 @@ bool HSHomeObject::PGBlobIterator::create_pg_snapshot_data(sisl::io_blob_safe& m
shard_ids.push_back(shard.info.id);
}

auto pg_entry = CreateResyncPGMetaDataDirect(builder_, pg_info.id, &uuid, pg->durable_entities().blob_sequence_num,
auto pg_entry = CreateResyncPGMetaDataDirect(builder_, pg_info.id, &uuid, pg_info.size, pg_info.chunk_size, pg->durable_entities().blob_sequence_num,
pg->shard_sequence_num_, &members, &shard_ids);
builder_.FinishSizePrefixed(pg_entry);

Expand Down Expand Up @@ -212,7 +212,7 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
}

sisl::io_blob_safe blob;
auto retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry);
uint8_t retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry);
for (int i = 0; i < retries; i++) {
auto result = load_blob_data(info, state).get();
if (result.hasError() && result.error().code == BlobErrorCode::READ_FAILED) {
Expand Down Expand Up @@ -241,7 +241,7 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
builder_.FinishSizePrefixed(
CreateResyncBlobDataBatchDirect(builder_, &blob_entries, end_of_shard));

LOGD("create blobs snapshot data batch: shard_id={}, batch_num={}, total_bytes={}, blob_num={}, end_of_shard={}",
LOGD("create blobs snapshot data batch: shard_seq_num={}, batch_num={}, total_bytes={}, blob_num={}, end_of_shard={}",
cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id, total_bytes, blob_entries.size(), end_of_shard);

pack_resync_message(data_blob, SyncMessageType::SHARD_BATCH);
Expand Down
4 changes: 3 additions & 1 deletion src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ int ReplicationStateMachine::read_snapshot_obj(std::shared_ptr< homestore::snaps
auto obj_id = objId(snp_obj->offset);
log_str = fmt::format("{} shard_seq_num={} batch_num={}", log_str, obj_id.shard_seq_num, obj_id.batch_id);

LOGD("read current snp obj {}", log_str)
//invalid Id
if (!pg_iter->update_cursor(obj_id)) {
LOGW("Invalid objId in snapshot read, {}, current shard_seq_num={}, current batch_num={}",
Expand Down Expand Up @@ -350,7 +351,8 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
// Check if the snapshot context is same as the current snapshot context.
// If not, drop the previous context and re-init a new one
if (m_snp_rcv_handler->get_context_lsn() != context->get_lsn()) {
m_snp_rcv_handler->reset_context(pg_data->pg_id(), context->get_lsn());
LOGI("reset context from lsn:{} to lsn:{}", m_snp_rcv_handler->get_context_lsn(), context->get_lsn());
m_snp_rcv_handler->reset_context(context->get_lsn(), pg_data->pg_id());
// TODO: Reset all data of current PG - let's resync on a pristine base
}

Expand Down
2 changes: 2 additions & 0 deletions src/lib/homestore_backend/resync_pg_data.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ table Member {
table ResyncPGMetaData {
pg_id : uint16; // only low 16 bit is used for pg_id;
replica_set_uuid : [ubyte]; // uuid of replica set
pg_size : uint64; // pg size;
chunk_size : uint64; // chunk size;
blob_seq_num : uint64; // blob sequence number, used to assign next blob id;
shard_seq_num: uint64; // shard sequence number, used to assign next shard id;
members : [Member]; // peers;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/homestore_backend/snapshot_receive_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ int HSHomeObject::SnapshotReceiveHandler::process_pg_snapshot_data(ResyncPGMetaD

// Create local PG
PGInfo pg_info(pg_meta.pg_id());
pg_info.size = pg_meta.pg_size();
pg_info.chunk_size = pg_meta.chunk_size();
std::copy_n(pg_meta.replica_set_uuid()->data(), 16, pg_info.replica_set_uuid.begin());
for (unsigned int i = 0; i < pg_meta.members()->size(); i++) {
const auto member = pg_meta.members()->Get(i);
Expand Down
7 changes: 6 additions & 1 deletion src/lib/homestore_backend/tests/hs_blob_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ TEST_F(HomeObjectFixture, PGBlobIterator) {
auto u1 = pg_msg->replica_set_uuid();
auto u2 = pg->pg_info_.replica_set_uuid;
ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end()));
ASSERT_EQ(pg_msg->pg_size(), pg->pg_info_.size);
ASSERT_EQ(pg_msg->chunk_size(), pg->pg_info_.chunk_size);
ASSERT_EQ(pg_msg->blob_seq_num(), pg->durable_entities().blob_sequence_num.load());
ASSERT_EQ(pg_msg->shard_seq_num(), pg->shard_sequence_num_);

Expand Down Expand Up @@ -300,6 +302,9 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {
// We have to create a PG first to init repl_dev
constexpr pg_id_t pg_id = 1;
create_pg(pg_id); // to create repl dev
auto iter = _obj_inst->_pg_map.find(pg_id);
ASSERT_TRUE(iter != _obj_inst->_pg_map.end());
auto pg = iter->second.get();
PGStats stats;
ASSERT_TRUE(_obj_inst->pg_manager()->get_stats(pg_id, stats));
auto r_dev = homestore::HomeStore::instance()->repl_service().get_repl_dev(stats.replica_set_uuid);
Expand All @@ -324,7 +329,7 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {
shard_ids.push_back(i);
}
auto pg_entry =
CreateResyncPGMetaDataDirect(builder, pg_id, &uuid, blob_seq_num, num_shards_per_pg, &members, &shard_ids);
CreateResyncPGMetaDataDirect(builder, pg_id, &uuid, pg->pg_info_.size, pg->pg_info_.chunk_size, blob_seq_num, num_shards_per_pg, &members, &shard_ids);
builder.Finish(pg_entry);
auto pg_meta = GetResyncPGMetaData(builder.GetBufferPointer());
auto ret = handler->process_pg_snapshot_data(*pg_meta);
Expand Down

0 comments on commit fd081cc

Please sign in to comment.