Skip to content

Commit

Permalink
Merge branch 'origin/main' into adapt_repldev
Browse files Browse the repository at this point in the history
  • Loading branch information
zichanglai committed Sep 29, 2023
2 parents 29bbe93 + c251a37 commit a12463b
Show file tree
Hide file tree
Showing 16 changed files with 50 additions and 52 deletions.
1 change: 0 additions & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ AlignConsecutiveDeclarations: false
AlignEscapedNewlines: Right
AlignOperands: false
AlignTrailingComments: true
AllowShortBlocksOnASingleLine: true
AllowShortIfStatementsOnASingleLine: true
AllowShortBlocksOnASingleLine: true
AllowShortCaseLabelsOnASingleLine: false
Expand Down
9 changes: 8 additions & 1 deletion src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@

#include <sisl/logging/logging.h>

#define LOGT(...) LOGTRACEMOD(homeobject, ##__VA_ARGS__)
#define LOGD(...) LOGDEBUGMOD(homeobject, ##__VA_ARGS__)
#define LOGI(...) LOGINFOMOD(homeobject, ##__VA_ARGS__)
#define LOGW(...) LOGWARNMOD(homeobject, ##__VA_ARGS__)
#define LOGE(...) LOGERRORMOD(homeobject, ##__VA_ARGS__)
#define LOGC(...) LOGCRITICALMOD(homeobject, ##__VA_ARGS__)

namespace homestore {
class ReplicationService;
}
Expand Down Expand Up @@ -89,7 +96,7 @@ class HomeObjectImpl : public HomeObject,

///
mutable std::shared_mutex _pg_lock;
std::map< pg_id_t, shared< PG > > _pg_map;
std::map< pg_id_t, unique< PG > > _pg_map;

mutable std::shared_mutex _shard_lock;
std::map< shard_id_t, ShardIterator > _shard_map;
Expand Down
12 changes: 6 additions & 6 deletions src/lib/homestore_backend/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ void HeapChunkSelector::add_chunk(csharedChunk& chunk) { m_chunks.emplace(VChunk
void HeapChunkSelector::add_chunk_internal(const chunk_num_t chunkID) {
if (m_chunks.find(chunkID) == m_chunks.end()) {
// sanity check
LOGWARN("No chunk found for ChunkID {}", chunkID);
LOGWARNMOD(homeobject, "No chunk found for ChunkID {}", chunkID);
return;
}
const auto& chunk = m_chunks[chunkID];
Expand All @@ -44,7 +44,7 @@ void HeapChunkSelector::add_chunk_internal(const chunk_num_t chunkID) {
csharedChunk HeapChunkSelector::select_chunk(homestore::blk_count_t count, const homestore::blk_alloc_hints& hint) {
auto& chunkIdHint = hint.chunk_id_hint;
if (chunkIdHint.has_value()) {
LOGWARN("should not allocated a chunk with exiting chunk_id {} in hint!", chunkIdHint.value());
LOGWARNMOD(homeobject, "should not allocated a chunk with exiting chunk_id {} in hint!", chunkIdHint.value());
return nullptr;
}

Expand All @@ -62,7 +62,7 @@ csharedChunk HeapChunkSelector::select_chunk(homestore::blk_count_t count, const
return lhs.second->available_blk_count.load() < rhs.second->available_blk_count.load();
});
if (it == m_per_dev_heap.end()) {
LOGWARN("No pdev found for new pg");
LOGWARNMOD(homeobject, "No pdev found for new pg");
return nullptr;
}
pdevID = it->first;
Expand All @@ -72,7 +72,7 @@ csharedChunk HeapChunkSelector::select_chunk(homestore::blk_count_t count, const

auto it = m_per_dev_heap.find(pdevID);
if (it == m_per_dev_heap.end()) {
LOGWARN("No pdev found for pdev {}", pdevID);
LOGWARNMOD(homeobject, "No pdev found for pdev {}", pdevID);
return nullptr;
}

Expand All @@ -90,7 +90,7 @@ csharedChunk HeapChunkSelector::select_chunk(homestore::blk_count_t count, const
auto& avalableBlkCounter = it->second->available_blk_count;
avalableBlkCounter.fetch_sub(vchunk.available_blks());
} else {
LOGWARN("No pdev found for pdev {}", pdevID);
LOGWARNMOD(homeobject, "No pdev found for pdev {}", pdevID);
}

return vchunk.get_internal_chunk();
Expand All @@ -106,7 +106,7 @@ void HeapChunkSelector::release_chunk(const chunk_num_t chunkID) {
const auto& it = m_chunks.find(chunkID);
if (it == m_chunks.end()) {
// sanity check
LOGWARN("No chunk found for ChunkID {}", chunkID);
LOGWARNMOD(homeobject, "No chunk found for ChunkID {}", chunkID);
} else {
add_chunk_internal(chunkID);
}
Expand Down
8 changes: 4 additions & 4 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
namespace homeobject {

extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application) {
LOGINFOMOD(homeobject, "Initializing HomeObject");
LOGI("Initializing HomeObject");
auto instance = std::make_shared< HSHomeObject >(std::move(application));
instance->init_homestore();
return instance;
Expand All @@ -26,12 +26,12 @@ void HSHomeObject::init_homestore() {
auto app = _application.lock();
RELEASE_ASSERT(app, "HomeObjectApplication lifetime unexpected!");

LOGINFO("Starting iomgr with {} threads, spdk: {}", app->threads(), false);
LOGI("Starting iomgr with {} threads, spdk: {}", app->threads(), false);
ioenvironment.with_iomgr(iomgr::iomgr_params{.num_threads = app->threads(), .is_spdk = app->spdk_mode()});

/// TODO Where should this come from?
const uint64_t app_mem_size = 2 * Gi;
LOGINFO("Initialize and start HomeStore with app_mem_size = {}", homestore::in_bytes(app_mem_size));
LOGI("Initialize and start HomeStore with app_mem_size = {}", homestore::in_bytes(app_mem_size));

std::vector< homestore::dev_info > device_info;
for (auto const& path : app->devices()) {
Expand All @@ -46,7 +46,7 @@ void HSHomeObject::init_homestore() {
.start(hs_input_params{.devices = device_info, .app_mem_size = app_mem_size},
[this]() { register_homestore_metablk_callback(); });
if (need_format) {
LOGWARN("Seems like we are booting/starting first time, Formatting!!");
LOGW("Seems like we are booting/starting first time, Formatting!!");
HomeStore::instance()->format_and_start({
{HS_SERVICE::META, hs_format_params{.size_pct = 5.0}},
{HS_SERVICE::LOG_REPLICATED, hs_format_params{.size_pct = 10.0}},
Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class HSHomeObject : public HomeObjectImpl {
private:
static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); }

void add_pg_to_map(shared< HS_PG > hs_pg);
void add_pg_to_map(unique< HS_PG > hs_pg);
shard_id_t generate_new_shard_id(pg_id_t pg);
uint64_t get_sequence_num_from_shard_id(uint64_t shard_id_t);

Expand Down
8 changes: 4 additions & 4 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set<
.create_repl_dev(pg_info.replica_set_uuid, std::move(peers), std::make_unique< ReplicationStateMachine >(this))
.thenValue([this, pg_info = std::move(pg_info)](auto&& v) -> PGManager::NullResult {
if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); }
add_pg_to_map(std::make_shared< HS_PG >(std::move(pg_info), std::move(v.value())));
add_pg_to_map(std::make_unique< HS_PG >(std::move(pg_info), std::move(v.value())));
return folly::Unit();
});
}
Expand All @@ -62,7 +62,7 @@ PGManager::NullAsyncResult HSHomeObject::_replace_member(pg_id_t id, peer_id_t c
return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP));
}

void HSHomeObject::add_pg_to_map(shared< HS_PG > hs_pg) {
void HSHomeObject::add_pg_to_map(unique< HS_PG > hs_pg) {
RELEASE_ASSERT(hs_pg->pg_info_.replica_set_uuid == hs_pg->repl_dev_->group_id(),
"PGInfo replica set uuid mismatch with ReplDev instance for {}",
boost::uuids::to_string(hs_pg->pg_info_.replica_set_uuid));
Expand Down Expand Up @@ -117,11 +117,11 @@ void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_c
.thenValue([this, pg_sb = std::move(pg_sb)](auto&& v) {
if (v.hasError()) {
// TODO: We need to raise an alert here, since without pg repl_dev all operations on that pg will fail
LOGERROR("open_repl_dev for group_id={} has failed", boost::uuids::to_string(pg_sb->replica_set_uuid));
LOGE("open_repl_dev for group_id={} has failed", boost::uuids::to_string(pg_sb->replica_set_uuid));
return;
}
add_pg_to_map(std::make_shared< HS_PG >(pg_sb, std::move(v.value())));

add_pg_to_map(std::make_unique< HS_PG >(pg_sb, std::move(v.value())));
// check if any shard recovery is pending by this pg;
std::scoped_lock lock_guard(recovery_mutex_);
auto iter = pending_recovery_shards_.find(pg_sb->id);
Expand Down
8 changes: 5 additions & 3 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_owner,
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_owner);
if (iter == _pg_map.end()) {
LOGWARN("failed to create shard with non-exist pg [{}]", pg_owner);
LOGW("failed to create shard with non-exist pg [{}]", pg_owner);
return folly::makeUnexpected(ShardError::UNKNOWN_PG);
}
repl_dev = std::static_pointer_cast< HS_PG >(iter->second)->repl_dev_;
repl_dev = static_cast< HS_PG* >(iter->second.get())->repl_dev_;
}

if (!repl_dev) {
LOGWARN("failed to get repl dev instance for pg [{}]", pg_owner);
LOGW("failed to get repl dev instance for pg [{}]", pg_owner);
return folly::makeUnexpected(ShardError::PG_NOT_READY);
}

Expand Down Expand Up @@ -144,6 +144,7 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header

void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader& header,
homestore::MultiBlkId const& blkids, sisl::blob value,

cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
Expand All @@ -156,6 +157,7 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader
return;
}


if (crc32_ieee(init_crc32, value.bytes, value.size) != header.payload_crc) {
// header & value is inconsistent;
LOGWARNMOD(homeobject, "replication message header is inconsistent with value, lsn:{}", lsn);
Expand Down
6 changes: 3 additions & 3 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
namespace homeobject {
void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& ctx) {
LOGINFO("applying raft log commit with lsn:{}", lsn);
LOGI("applying raft log commit with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.bytes);
switch (msg_header->msg_type) {
case ReplicationMessageType::CREATE_SHARD_MSG:
Expand All @@ -22,7 +22,7 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c

bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const&, sisl::blob const&,
cintrusive< homestore::repl_req_ctx >&) {
LOGINFO("on_pre_commit with lsn:{}", lsn);
LOGI("on_pre_commit with lsn:{}", lsn);
// For shard creation, since homestore repldev inside will write shard header to data service first before this
// function is called. So there is nothing is needed to do and we can get the binding chunk_id with the newly shard
// from the blkid in on_commit()
Expand All @@ -31,7 +31,7 @@ bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const&, sisl

void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const&, sisl::blob const&,
cintrusive< homestore::repl_req_ctx >&) {
LOGINFO("on_rollback with lsn:{}", lsn);
LOGI("on_rollback with lsn:{}", lsn);
}

homestore::blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header,
Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/tests/test_home_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class FixtureApp : public homeobject::HomeObjectApplication {
bool spdk_mode() const override { return false; }
uint32_t threads() const override { return 2; }
std::list< std::filesystem::path > devices() const override {
LOGINFO("creating {} device file with size={}", fpath_, homestore::in_bytes(2 * Gi));
LOGI("creating {} device file with size={}", fpath_, homestore::in_bytes(2 * Gi));
if (std::filesystem::exists(fpath_)) { std::filesystem::remove(fpath_); }
std::ofstream ofs{fpath_, std::ios::binary | std::ios::out | std::ios::trunc};
std::filesystem::resize_file(fpath_, 2 * Gi);
Expand Down
6 changes: 3 additions & 3 deletions src/lib/homestore_backend/tests/test_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class FixtureApp : public homeobject::HomeObjectApplication {
bool spdk_mode() const override { return false; }
uint32_t threads() const override { return 2; }
std::list< std::filesystem::path > devices() const override {
LOGINFO("creating {} device file with size={}", fpath_, homestore::in_bytes(2 * Gi));
LOGI("creating {} device file with size={}", fpath_, homestore::in_bytes(2 * Gi));
if (std::filesystem::exists(fpath_)) { std::filesystem::remove(fpath_); }
std::ofstream ofs{fpath_, std::ios::binary | std::ios::out | std::ios::trunc};
std::filesystem::resize_file(fpath_, 2 * Gi);
Expand Down Expand Up @@ -183,7 +183,7 @@ TEST_F(ShardManagerTesting, MockSealShard) {
auto seal_shard_msg = j.dump();

homeobject::HSHomeObject* ho = dynamic_cast< homeobject::HSHomeObject* >(_home_object.get());
auto pg = dp_cast< homeobject::HSHomeObject::HS_PG >(ho->_pg_map[_pg_id]);
auto* pg = s_cast<homeobject::HSHomeObject::HS_PG* >(ho->_pg_map[_pg_id].get());
auto repl_dev = pg->repl_dev_;
const auto msg_size = sisl::round_up(seal_shard_msg.size(), repl_dev->get_blk_size());
auto req = homeobject::repl_result_ctx< ShardManager::Result< ShardInfo > >::make(msg_size, 512 /*alignment*/);
Expand Down Expand Up @@ -212,7 +212,7 @@ TEST_F(ShardManagerTesting, MockSealShard) {

auto pg_iter = ho->_pg_map.find(_pg_id);
EXPECT_TRUE(pg_iter != ho->_pg_map.end());
auto pg_result = pg_iter->second;
auto& pg_result = pg_iter->second;
EXPECT_EQ(1, pg_result->shards_.size());
auto& check_shard = pg_result->shards_.front();
EXPECT_EQ(ShardInfo::State::SEALED, check_shard->info.state);
Expand Down
9 changes: 4 additions & 5 deletions src/lib/memory_backend/mem_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ namespace homeobject {

#define WITH_ROUTE(blob) \
auto const route = BlobRoute{_shard.id, (blob)}; \
LOGTRACEMOD(homeobject, "[route={}]", route);
LOGT("[route={}]", route);

#define IF_BLOB_ALIVE \
if (auto blob_it = shard.btree_.find(route); shard.btree_.end() == blob_it || !blob_it->second) { \
LOGWARNMOD(homeobject, "[route={}] missing", route); \
LOGW("[route={}] missing", route); \
return folly::makeUnexpected(BlobError::UNKNOWN_BLOB); \
} else

Expand All @@ -40,9 +40,8 @@ BlobManager::NullResult MemoryHomeObject::_del_blob(ShardInfo const& _shard, blo
WITH_SHARD
WITH_ROUTE(_blob)
IF_BLOB_ALIVE {
auto del_blob = BlobExt();
del_blob.blob_ = blob_it->second.blob_;
shard.btree_.assign_if_equal(route, blob_it->second, std::move(del_blob));
shard.btree_.assign_if_equal(route, blob_it->second,
BlobExt{.state_ = BlobState::DELETED, .blob_ = blob_it->second.blob_});
return folly::Unit();
}
}
Expand Down
15 changes: 3 additions & 12 deletions src/lib/memory_backend/mem_homeobject.cpp
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
#include "mem_homeobject.hpp"

#include <boost/uuid/random_generator.hpp>

namespace homeobject {

/// NOTE: We give ourselves the option to provide a different HR instance here than libhomeobject.a
extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application) {
return std::make_shared< MemoryHomeObject >(std::move(application));
}

#if 0
void HomeObjectImpl::init_repl_svc() {
auto lg = std::scoped_lock(_repl_lock);
if (!_repl_svc) {
_our_id = boost::uuids::random_generator()();
LOGINFOMOD(homeobject, "SvcId faked: {}", to_string(_our_id));
_our_id = _application.lock()->discover_svcid(_our_id);
_repl_svc = home_replication::create_repl_service([](auto) { return nullptr; });
}
MemoryHomeObject::MemoryHomeObject(std::weak_ptr< HomeObjectApplication >&& application) :
HomeObjectImpl::HomeObjectImpl(std::move(application)) {
_our_id = _application.lock()->discover_svcid(_our_id);
}
#endif

ShardIndex::~ShardIndex() {
for (auto it = btree_.begin(); it != btree_.end(); ++it) {
Expand Down
2 changes: 1 addition & 1 deletion src/lib/memory_backend/mem_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class MemoryHomeObject : public HomeObjectImpl {
ShardIndex& _find_index(shard_id_t) const;

public:
using HomeObjectImpl::HomeObjectImpl;
MemoryHomeObject(std::weak_ptr< HomeObjectApplication >&& application);
~MemoryHomeObject() override = default;
};

Expand Down
4 changes: 2 additions & 2 deletions src/lib/memory_backend/mem_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace homeobject {
PGManager::NullAsyncResult MemoryHomeObject::_create_pg(PGInfo&& pg_info, std::set< std::string, std::less<> >) {
auto lg = std::scoped_lock(_pg_lock);
auto [it1, _] = _pg_map.try_emplace(pg_info.id, std::make_shared< PG >(pg_info));
auto [it1, _] = _pg_map.try_emplace(pg_info.id, std::make_unique< PG >(pg_info));
RELEASE_ASSERT(_pg_map.end() != it1, "Unknown map insert error!");
return folly::makeSemiFuture< PGManager::NullResult >(folly::Unit());
}
Expand All @@ -12,4 +12,4 @@ PGManager::NullAsyncResult MemoryHomeObject::_replace_member(pg_id_t id, peer_id
PGMember const& new_member) {
return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP));
}
} // namespace homeobject
} // namespace homeobject
8 changes: 4 additions & 4 deletions src/lib/pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace homeobject {
std::shared_ptr< PGManager > HomeObjectImpl::pg_manager() { return shared_from_this(); }

PGManager::NullAsyncResult HomeObjectImpl::create_pg(PGInfo&& pg_info) {
LOGINFO("Creating PG: [{}] of [{}] members", pg_info.id, pg_info.members.size());
LOGI("[pg={}] has [{}] members", pg_info.id, pg_info.members.size());
auto saw_ourself = false;
auto saw_leader = false;
auto peers = std::set< std::string, std::less<> >();
Expand All @@ -22,14 +22,14 @@ PGManager::NullAsyncResult HomeObjectImpl::create_pg(PGInfo&& pg_info) {

PGManager::NullAsyncResult HomeObjectImpl::replace_member(pg_id_t id, peer_id_t const& old_member,
PGMember const& new_member) {
LOGINFO("Replacing PG: [{}] member [{}] with [{}]", id, to_string(old_member), to_string(new_member.id));
LOGI("[pg={}] replace member [{}] with [{}]", id, to_string(old_member), to_string(new_member.id));
if (old_member == new_member.id) {
LOGWARN("Rejecting replace_member with identical replacement SvcId [{}]!", to_string(old_member));
LOGW("rejecting identical replacement SvcId [{}]!", to_string(old_member));
return folly::makeUnexpected(PGError::INVALID_ARG);
}

if (old_member == our_uuid()) {
LOGWARN("Rejecting replace_member removing ourself {}!", to_string(old_member));
LOGW("refusing to remove ourself {}!", to_string(old_member));
return folly::makeUnexpected(PGError::INVALID_ARG);
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib/shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ ShardManager::AsyncResult< InfoList > HomeObjectImpl::list_shards(pg_id_t pgid)

auto info_l = std::list< ShardInfo >();
for (auto const& shard : pg->shards_) {
LOGDEBUG("Listing Shard {}", shard->info.id);
LOGD("found [shard={}]", shard->info.id);
info_l.push_back(shard->info);
}
return info_l;
Expand Down

0 comments on commit a12463b

Please sign in to comment.