Skip to content

Commit

Permalink
Rename snapshot_data to snapshot_obj
Browse files Browse the repository at this point in the history
This is the adaptive change to homestore eBay/HomeStore#596
  • Loading branch information
yuwmao committed Dec 4, 2024
1 parent 6f519ad commit b9e8d79
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 25 deletions.
48 changes: 25 additions & 23 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,15 +232,17 @@ std::shared_ptr< homestore::snapshot_context > ReplicationStateMachine::last_sna
}

int ReplicationStateMachine::read_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_data) {
std::shared_ptr< homestore::snapshot_obj > snp_obj) {
HSHomeObject::PGBlobIterator* pg_iter = nullptr;
auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot();

if (snp_data->user_ctx == nullptr) {
if (snp_obj->user_ctx == nullptr) {
// Create the pg blob iterator for the first time.
pg_iter = new HSHomeObject::PGBlobIterator(*home_object_, repl_dev()->group_id(), context->get_lsn());
snp_data->user_ctx = (void*)pg_iter;
} else { pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(snp_data->user_ctx); }
snp_obj->user_ctx = (void*)pg_iter;
} else {
pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(snp_obj->user_ctx);
}

// Nuraft uses obj_id as a way to track the state of the snapshot read and write.
// Nuraft starts with obj_id == 0 as first message always, leader send all the shards and
Expand All @@ -258,14 +260,14 @@ int ReplicationStateMachine::read_snapshot_obj(std::shared_ptr< homestore::snaps
boost::uuids::to_string(repl_dev()->group_id()), s->get_last_log_term(),
s->get_last_log_idx());
//TODO snp_data->offset is int64, need to change to uint64 in homestore
if (snp_data->offset == int64_t(LAST_OBJ_ID)) {
if (snp_obj->offset == LAST_OBJ_ID) {
// No more shards to read, baseline resync is finished after this.
snp_data->is_last_obj = true;
snp_obj->is_last_obj = true;
LOGD("Read snapshot end, {}", log_str);
return 0;
}

auto obj_id = objId(snp_data->offset);
auto obj_id = objId(snp_obj->offset);
log_str = fmt::format("{} shard_seq_num={} batch_num={}", log_str, obj_id.shard_seq_num, obj_id.batch_id);

//invalid Id
Expand All @@ -278,7 +280,7 @@ int ReplicationStateMachine::read_snapshot_obj(std::shared_ptr< homestore::snaps
//pg metadata message
//shardId starts from 1
if (obj_id.shard_seq_num == 0) {
if (!pg_iter->create_pg_snapshot_data(snp_data->blob)) {
if (!pg_iter->create_pg_snapshot_data(snp_obj->blob)) {
LOGE("Failed to create pg snapshot data for snapshot read, {}", log_str);
return -1;
}
Expand All @@ -290,54 +292,54 @@ int ReplicationStateMachine::read_snapshot_obj(std::shared_ptr< homestore::snaps
LOGE("Failed to generate shard blob list for snapshot read, {}", log_str);
return -1;
}
if (!pg_iter->create_shard_snapshot_data(snp_data->blob)) {
if (!pg_iter->create_shard_snapshot_data(snp_obj->blob)) {
LOGE("Failed to create shard meta data for snapshot read, {}", log_str);
return -1;
}
return 0;
}
//general blob message
if (!pg_iter->create_blobs_snapshot_data(snp_data->blob)) {
if (!pg_iter->create_blobs_snapshot_data(snp_obj->blob)) {
LOGE("Failed to create blob batch data for snapshot read, {}", log_str);
return -1;
}
return 0;
}

void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_data) {
std::shared_ptr< homestore::snapshot_obj > snp_obj) {
RELEASE_ASSERT(context != nullptr, "Context null");
RELEASE_ASSERT(snp_data != nullptr, "Snapshot data null");
RELEASE_ASSERT(snp_obj != nullptr, "Snapshot data null");
auto r_dev = repl_dev();
if (!m_snp_rcv_handler) {
m_snp_rcv_handler = std::make_unique< HSHomeObject::SnapshotReceiveHandler >(*home_object_, r_dev);
}

auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot();
auto obj_id = objId(static_cast< snp_obj_id_t >(snp_data->offset));
auto obj_id = objId(static_cast< snp_obj_id_t >(snp_obj->offset));
auto log_suffix = fmt::format("group={} term={} lsn={} shard={} batch_num={} size={}",
uuids::to_string(r_dev->group_id()), s->get_last_log_term(), s->get_last_log_idx(),
obj_id.shard_seq_num, obj_id.batch_id, snp_data->blob.size());
obj_id.shard_seq_num, obj_id.batch_id, snp_obj->blob.size());

if (snp_data->is_last_obj) {
if (snp_obj->is_last_obj) {
LOGD("Write snapshot reached is_last_obj true {}", log_suffix);
return;
}

// Check message integrity
// TODO: add a flip here to simulate corrupted message
auto header = r_cast< const SyncMessageHeader* >(snp_data->blob.cbytes());
auto header = r_cast< const SyncMessageHeader* >(snp_obj->blob.cbytes());
if (header->corrupted()) {
LOGE("corrupted message in write_snapshot_data, lsn:{}, obj_id {} shard {} batch {}", s->get_last_log_idx(),
obj_id.value, obj_id.shard_seq_num, obj_id.batch_id);
return;
}
if (auto payload_size = snp_data->blob.size() - sizeof(SyncMessageHeader); payload_size != header->payload_size) {
if (auto payload_size = snp_obj->blob.size() - sizeof(SyncMessageHeader); payload_size != header->payload_size) {
LOGE("payload size mismatch in write_snapshot_data {} != {}, lsn:{}, obj_id {} shard {} batch {}", payload_size,
header->payload_size, s->get_last_log_idx(), obj_id.value, obj_id.shard_seq_num, obj_id.batch_id);
return;
}
auto data_buf = snp_data->blob.cbytes() + sizeof(SyncMessageHeader);
auto data_buf = snp_obj->blob.cbytes() + sizeof(SyncMessageHeader);

if (obj_id.shard_seq_num == 0) {
// PG metadata & shard list message
Expand All @@ -359,7 +361,7 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
obj_id.value, obj_id.shard_seq_num, obj_id.batch_id, ret);
return;
}
snp_data->offset =
snp_obj->offset =
objId(HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->get_next_shard()), 0).value;
LOGD("Write snapshot, processed PG data pg_id:{} {}", pg_data->pg_id(), log_suffix);
return;
Expand All @@ -380,7 +382,7 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
return;
}
// Request for the next batch
snp_data->offset = objId(obj_id.shard_seq_num, 1).value;
snp_obj->offset = objId(obj_id.shard_seq_num, 1).value;
LOGD("Write snapshot, processed shard data shard_seq_num:{} {}", obj_id.shard_seq_num, log_suffix);
return;
}
Expand All @@ -403,12 +405,12 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
if (blob_batch->is_last_batch()) {
auto next_shard = m_snp_rcv_handler->get_next_shard();
if (next_shard == HSHomeObject::SnapshotReceiveHandler::shard_list_end_marker) {
snp_data->offset = LAST_OBJ_ID;
snp_obj->offset = LAST_OBJ_ID;
} else {
snp_data->offset = objId(HSHomeObject::get_sequence_num_from_shard_id(next_shard), 0).value;
snp_obj->offset = objId(HSHomeObject::get_sequence_num_from_shard_id(next_shard), 0).value;
}
} else {
snp_data->offset = objId(obj_id.shard_seq_num, obj_id.batch_id + 1).value;
snp_obj->offset = objId(obj_id.shard_seq_num, obj_id.batch_id + 1).value;
}

LOGD("Write snapshot, processed blob data shard_seq_num:{} batch_num:{} {}", obj_id.shard_seq_num, obj_id.batch_id,
Expand Down
4 changes: 2 additions & 2 deletions src/lib/homestore_backend/replication_state_machine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ class ReplicationStateMachine : public homestore::ReplDevListener {
virtual bool apply_snapshot(std::shared_ptr< homestore::snapshot_context > context) override;
virtual std::shared_ptr< homestore::snapshot_context > last_snapshot() override;
virtual int read_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_data) override;
std::shared_ptr< homestore::snapshot_obj > snp_obj) override;
virtual void write_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_data) override;
std::shared_ptr< homestore::snapshot_obj > snp_obj) override;
virtual void free_user_snp_ctx(void*& user_snp_ctx) override;

private:
Expand Down

0 comments on commit b9e8d79

Please sign in to comment.