Skip to content

Commit

Permalink
Logging consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd committed Oct 5, 2023
1 parent 2d1bda7 commit d1378de
Show file tree
Hide file tree
Showing 16 changed files with 164 additions and 152 deletions.
6 changes: 3 additions & 3 deletions src/include/nuraft_mesg/mesg_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class group_factory : public grpc_factory {

class mesg_factory final : public grpc_factory {
std::shared_ptr< group_factory > _group_factory;
group_id_t const _group_name;
group_id_t const _group_id;
group_type_t const _group_type;
std::shared_ptr< sisl::MetricsGroupWrapper > _metrics;

Expand All @@ -59,11 +59,11 @@ class mesg_factory final : public grpc_factory {
std::shared_ptr< sisl::MetricsGroupWrapper > metrics = nullptr) :
grpc_factory(0, to_string(grp_id)),
_group_factory(g_factory),
_group_name(grp_id),
_group_id(grp_id),
_group_type(grp_type),
_metrics(metrics) {}

group_id_t group_name() const { return _group_name; }
group_id_t group_id() const { return _group_id; }

nuraft::cmd_result_code create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >& rpc_ptr) override;

Expand Down
12 changes: 12 additions & 0 deletions src/lib/common_lib.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#pragma once

#include <sisl/logging/logging.h>

#include "nuraft_mesg/common.hpp"

#define LOGT(...) LOGTRACEMOD(nuraft_mesg, ##__VA_ARGS__)
#define LOGD(...) LOGDEBUGMOD(nuraft_mesg, ##__VA_ARGS__)
#define LOGI(...) LOGINFOMOD(nuraft_mesg, ##__VA_ARGS__)
#define LOGW(...) LOGWARNMOD(nuraft_mesg, ##__VA_ARGS__)
#define LOGE(...) LOGERRORMOD(nuraft_mesg, ##__VA_ARGS__)
#define LOGC(...) LOGCRITICALMOD(nuraft_mesg, ##__VA_ARGS__)
6 changes: 3 additions & 3 deletions src/lib/data_service_grpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ bool data_service_grpc::bind(std::string const& request_name, group_id_t const&
data_service_request_handler_t const& request_cb) {
RELEASE_ASSERT(_grpc_server, "NULL _grpc_server!");
if (!request_cb) {
LOGWARNMOD(nuraft_mesg, "request_cb null for the request {}, cannot bind.", request_name);
LOGW("request_cb null for the request {}, cannot bind.", request_name);
return false;
}
// This is an async call, hence the "return false". The user should invoke rpc_data->send_response to finish the
// call
auto generic_handler_cb = [request_cb](boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) {
sisl::io_blob svr_buf;
if (auto status = deserialize_from_byte_buffer(rpc_data->request(), svr_buf); !status.ok()) {
LOGERRORMOD(nuraft_mesg, "ByteBuffer DumpToSingleSlice failed, {}", status.error_message());
LOGE(, "ByteBuffer DumpToSingleSlice failed, {}", status.error_message());
rpc_data->set_status(status);
return true; // respond immediately
}
Expand All @@ -50,7 +50,7 @@ bool data_service_grpc::bind(std::string const& request_name, group_id_t const&
throw std::runtime_error(fmt::format("Could not register generic rpc {} with gRPC!", request_name));
}
} else {
LOGWARNMOD(nuraft_mesg, "data service rpc {} exists", it->first);
LOGW(, "data service rpc {} exists", it->first);
return false;
}
} else {
Expand Down
10 changes: 4 additions & 6 deletions src/lib/grpc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ inline std::shared_ptr< nuraft::resp_msg > toResponse(RaftMessage const& raft_ms
resp.next_index(), resp.accepted());
message->set_result_code((nuraft::cmd_result_code)(0 - resp.result_code()));
if (nuraft::cmd_result_code::NOT_LEADER == message->get_result_code()) {
LOGINFOMOD(nuraft_mesg, "Leader has changed!");
LOGI("Leader has changed!");
message->dest_addr = resp.dest_addr();
}
if (0 < resp.context().length()) {
Expand All @@ -67,16 +67,14 @@ inline std::shared_ptr< nuraft::resp_msg > toResponse(RaftMessage const& raft_ms

std::atomic_uint64_t grpc_base_client::_client_counter = 0ul;

void grpc_base_client::send(std::shared_ptr< nuraft::req_msg >& req, nuraft::rpc_handler& complete,
uint64_t) {
void grpc_base_client::send(std::shared_ptr< nuraft::req_msg >& req, nuraft::rpc_handler& complete, uint64_t) {
assert(req && complete);
RaftMessage grpc_request;
grpc_request.set_allocated_base(fromBaseRequest(*req));
grpc_request.set_allocated_rc_request(fromRCRequest(*req));

LOGTRACEMOD(nuraft_mesg, "Sending [{}] from: [{}] to: [{}]",
nuraft::msg_type_to_string(nuraft::msg_type(grpc_request.base().type())), grpc_request.base().src(),
grpc_request.base().dest());
LOGT("Sending [{}] from: [{}] to: [{}]", nuraft::msg_type_to_string(nuraft::msg_type(grpc_request.base().type())),
grpc_request.base().src(), grpc_request.base().dest());

send(grpc_request, [req, complete](RaftMessage& response, ::grpc::Status& status) mutable -> void {
std::shared_ptr< nuraft::rpc_exception > err;
Expand Down
14 changes: 7 additions & 7 deletions src/lib/grpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <sisl/grpc/rpc_client.hpp>
#include <sisl/logging/logging.h>

#include "nuraft_mesg/common.hpp"
#include "common_lib.hpp"

namespace nuraft_mesg {

Expand Down Expand Up @@ -76,13 +76,13 @@ class grpc_client : public grpc_base_client, public sisl::GrpcAsyncClient {

void init() override {
// Re-create channel only if current channel is busted.
if (!_stub || !is_connection_ready()) {
LOGDEBUGMOD(nuraft_mesg, "Client init ({}) to {}", (!!_stub ? "Again" : "First"), _addr);
sisl::GrpcAsyncClient::init();
_stub = sisl::GrpcAsyncClient::make_stub< TSERVICE >(_worker_name);
} else {
LOGDEBUGMOD(nuraft_mesg, "Channel looks fine, re-using");
if (_stub && is_connection_ready()) {
LOGD("Channel looks fine, re-using");
return;
}
LOGD("Client init ({}) to {}", (!!_stub ? "Again" : "First"), _addr);
sisl::GrpcAsyncClient::init();
_stub = sisl::GrpcAsyncClient::make_stub< TSERVICE >(_worker_name);
}

protected:
Expand Down
23 changes: 11 additions & 12 deletions src/lib/grpc_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,27 +93,26 @@ void respHandler(std::shared_ptr< ContextType > ctx, std::shared_ptr< nuraft::re
std::shared_ptr< nuraft::rpc_exception >& err) {
auto factory = ctx->cli_factory();
if (err || !rsp) {
LOGERROR("{}", (err ? err->what() : "No response."));
LOGE("{}", (err ? err->what() : "No response."));
ctx->set((rsp ? rsp->get_result_code() : nuraft::cmd_result_code::SERVER_NOT_FOUND));
return;
} else if (rsp->get_accepted()) {
LOGDEBUGMOD(nuraft_mesg, "Accepted response");
LOGD("Accepted response");
ctx->set(rsp->get_result_code());
return;
} else if (ctx->_cur_dest == rsp->get_dst()) {
LOGWARN("Request ignored");
LOGW("Request ignored");
ctx->set(rsp->get_result_code());
return;
} else if (0 > rsp->get_dst()) {
LOGWARN("No known leader!");
LOGW("No known leader!");
ctx->set(rsp->get_result_code());
return;
}

// Not accepted: means that `get_dst()` is a new leader.
auto gresp = std::dynamic_pointer_cast< grpc_resp >(rsp);
LOGDEBUGMOD(nuraft_mesg, "Updating destination from {} to {}[{}]", ctx->_cur_dest, rsp->get_dst(),
gresp->dest_addr);
LOGD("Updating destination from {} to {}[{}]", ctx->_cur_dest, rsp->get_dst(), gresp->dest_addr);
ctx->_cur_dest = rsp->get_dst();
auto client = factory->create_client(gresp->dest_addr);

Expand All @@ -123,7 +122,7 @@ void respHandler(std::shared_ptr< ContextType > ctx, std::shared_ptr< nuraft::re
respHandler(ctx, rsp, err);
});

LOGDEBUGMOD(nuraft_mesg, "Creating new message: {}", ctx->_new_srv_addr);
LOGD("Creating new message: {}", ctx->_new_srv_addr);
auto msg = createMessage(ctx->payload(), ctx->_new_srv_addr);
client->send(msg, handler);
}
Expand All @@ -144,7 +143,7 @@ class grpc_error_client : public grpc_base_client {
nuraft::ptr< nuraft::rpc_client > grpc_factory::create_client(std::string const& client) {
try {
return create_client(boost::uuids::string_generator()(client));
} catch (std::runtime_error const& e) { LOGCRITICAL("Client Endpoint Invalid! [{}]", client); }
} catch (std::runtime_error const& e) { LOGC("Client Endpoint Invalid! [{}]", client); }
return nullptr;
}

Expand All @@ -155,17 +154,17 @@ nuraft::ptr< nuraft::rpc_client > grpc_factory::create_client(peer_id_t const& c
auto [it, happened] = _clients.emplace(client, nullptr);
if (_clients.end() != it) {
if (!happened) {
LOGDEBUGMOD(nuraft_mesg, "Re-creating client for {}", client);
LOGD("Re-creating client for {}", client);
if (auto err = reinit_client(client, it->second); nuraft::OK != err) {
LOGERROR("Failed to re-initialize client {}: {}", client, err);
LOGE("Failed to re-initialize client {}: {}", client, err);
new_client = std::make_shared< grpc_error_client >();
} else {
new_client = it->second;
}
} else {
LOGDEBUGMOD(nuraft_mesg, "Creating client for {}", client);
LOGD("Creating client for {}", client);
if (auto err = create_client(client, it->second); nuraft::OK != err) {
LOGERROR("Failed to create client for {}: {}", client, err);
LOGE("Failed to create client for {}: {}", client, err);
new_client = std::make_shared< grpc_error_client >();
} else {
new_client = it->second;
Expand Down
5 changes: 2 additions & 3 deletions src/lib/grpc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,8 @@ grpc_server::append_entries(std::vector< nuraft::ptr< nuraft::buffer > > const&
}

::grpc::Status grpc_server::step(const RaftMessage& request, RaftMessage& reply) {
LOGTRACEMOD(nuraft_mesg, "Stepping [{}] from: [{}] to: [{}]",
nuraft::msg_type_to_string(nuraft::msg_type(request.base().type())), request.base().src(),
request.base().dest());
LOGT("Stepping [{}] from: [{}] to: [{}]", nuraft::msg_type_to_string(nuraft::msg_type(request.base().type())),
request.base().src(), request.base().dest());
auto rcreq = toRequest(request);
auto resp = nuraft::raft_server_handler::process_req(_raft_server.get(), *rcreq);
if (!resp) { return ::grpc::Status(::grpc::StatusCode::CANCELLED, "Server rejected request"); }
Expand Down
33 changes: 22 additions & 11 deletions src/lib/logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <libnuraft/logger.hxx>
#include <sisl/logging/logging.h>

#include "nuraft_mesg/common.hpp"
#include "common_lib.hpp"

class nuraft_mesg_logger : public ::nuraft::logger {
nuraft_mesg::group_id_t const _group_id;
Expand All @@ -29,31 +29,42 @@ class nuraft_mesg_logger : public ::nuraft::logger {
::nuraft::logger(), _group_id(group_id), _custom_logger(custom_logger) {}

void set_level(int l) override {
LOGDEBUGMOD(nuraft_mesg, "Updating level to: {}", l);
LOGD("Updating level to: {}", l);
SISL_LOG_LEVEL(nuraft, static_cast< spdlog::level::level_enum >(abs(l - 6)));
}

void put_details(int level, const char* source_file, const char* func_name, size_t line_number,
const std::string& log_line) override {
auto const mesg =
fmt::format("[group={}] {}:{}#{} : {}", _group_id, file_name(source_file), func_name, line_number, log_line);
switch (level) {
case 1:
[[fallthrough]];
case 1: {
if (LEVELCHECK(nuraft_mesg, spdlog::level::level_enum::critical))
sisl::logging::GetLogger()->critical("[{}:{}:{}] {} [group={}]", file_name(source_file), line_number,
func_name, log_line, _group_id);
} break;
case 2: {
LOGERRORMOD(nuraft, "{}", mesg);
if (LEVELCHECK(nuraft_mesg, spdlog::level::level_enum::err))
sisl::logging::GetLogger()->error("[{}:{}:{}] {} [group={}]", file_name(source_file), line_number,
func_name, log_line, _group_id);
} break;
case 3: {
LOGWARNMOD(nuraft, "{}", mesg);
if (LEVELCHECK(nuraft_mesg, spdlog::level::level_enum::warn))
sisl::logging::GetLogger()->warn("[{}:{}:{}] {} [group={}]", file_name(source_file), line_number,
func_name, log_line, _group_id);
} break;
case 4: {
LOGINFOMOD_USING_LOGGER(nuraft, _custom_logger, "{}", mesg);
if (LEVELCHECK(nuraft_mesg, spdlog::level::level_enum::info))
_custom_logger->info("[{}:{}:{}] {} [group={}]", file_name(source_file), line_number, func_name,
log_line, _group_id);
} break;
case 5: {
LOGDEBUGMOD_USING_LOGGER(nuraft, _custom_logger, "{}", mesg);
if (LEVELCHECK(nuraft_mesg, spdlog::level::level_enum::debug))
_custom_logger->debug("[{}:{}:{}] {} [group={}]", file_name(source_file), line_number, func_name,
log_line, _group_id);
} break;
default: {
LOGTRACEMOD_USING_LOGGER(nuraft, _custom_logger, "{}", mesg);
if (LEVELCHECK(nuraft_mesg, spdlog::level::level_enum::trace))
_custom_logger->trace("[{}:{}:{}] {} [group={}]", file_name(source_file), line_number, func_name,
log_line, _group_id);
} break;
}
}
Expand Down
30 changes: 15 additions & 15 deletions src/lib/manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class engine_factory : public group_factory {
application_(app) {}

std::string lookupEndpoint(peer_id_t const& client) override {
LOGTRACEMOD(nuraft_mesg, "[peer={}]", client);
LOGT("[peer={}]", client);
if (auto a = application_.lock(); a) return a->lookup_peer(client);
return std::string();
}
Expand Down Expand Up @@ -93,7 +93,7 @@ ManagerImpl::ManagerImpl(Manager::Params const& start_params, std::weak_ptr< Mes

void ManagerImpl::restart_server() {
auto listen_address = fmt::format(FMT_STRING("0.0.0.0:{}"), start_params_.mesg_port_);
LOGINFO("Starting Messaging Service on http://{}", listen_address);
LOGI("Starting Messaging Service on http://{}", listen_address);

std::lock_guard< std::mutex > lg(_manager_lock);
_grpc_server.reset();
Expand All @@ -111,39 +111,39 @@ void ManagerImpl::register_mgr_type(group_type_t const& group_type, group_params
auto [it, happened] = _state_mgr_types.emplace(std::make_pair(group_type, params));
DEBUG_ASSERT(_state_mgr_types.end() != it, "Out of memory?");
DEBUG_ASSERT(!!happened, "Re-register?");
if (_state_mgr_types.end() == it) { LOGERROR("Could not register group type: {}", group_type); }
if (_state_mgr_types.end() == it) { LOGE("Could not register [group_type={}]", group_type); }
}

nuraft::cb_func::ReturnCode ManagerImpl::callback_handler(group_id_t const& group_id, nuraft::cb_func::Type type,
nuraft::cb_func::Param* param) {
switch (type) {
case nuraft::cb_func::RemovedFromCluster: {
LOGINFO("Removed from cluster [group={}]", group_id);
LOGI("Removed from cluster [group={}]", group_id);
exit_group(group_id);
} break;
case nuraft::cb_func::JoinedCluster: {
auto const my_id = param->myId;
auto const leader_id = param->leaderId;
LOGINFO("Joined cluster: [group={}], [l_id:{},my_id:{}]", group_id, leader_id, my_id);
LOGI("Joined cluster: [group={}], [l_id:{},my_id:{}]", group_id, leader_id, my_id);
{
std::lock_guard< std::mutex > lg(_manager_lock);
_is_leader[group_id] = (leader_id == my_id);
}
} break;
case nuraft::cb_func::NewConfig: {
LOGDEBUGMOD(nuraft_mesg, "Cluster change for: [group={}]", group_id);
LOGD("Cluster change for: [group={}]", group_id);
_config_change.notify_all();
} break;
case nuraft::cb_func::BecomeLeader: {
LOGDEBUGMOD(nuraft_mesg, "I'm the leader of: [group={}]!", group_id);
LOGD("I'm the leader of: [group={}]!", group_id);
{
std::lock_guard< std::mutex > lg(_manager_lock);
_is_leader[group_id] = true;
}
_config_change.notify_all();
} break;
case nuraft::cb_func::BecomeFollower: {
LOGINFOMOD(nuraft_mesg, "I'm a follower of: [group={}]!", group_id);
LOGI("I'm a follower of: [group={}]!", group_id);
{
std::lock_guard< std::mutex > lg(_manager_lock);
_is_leader[group_id] = false;
Expand All @@ -167,7 +167,7 @@ void ManagerImpl::exit_group(group_id_t const& group_id) {
nuraft::cmd_result_code ManagerImpl::group_init(int32_t const srv_id, group_id_t const& group_id,
group_type_t const& group_type, nuraft::context*& ctx,
std::shared_ptr< nuraft_mesg::group_metrics > metrics) {
LOGDEBUGMOD(nuraft_mesg, "Creating context for: [group_id={}] as Member: {}", group_id, srv_id);
LOGD("Creating context for: [group_id={}] as Member: {}", group_id, srv_id);

// State manager (RAFT log store, config)
std::shared_ptr< nuraft::state_mgr > smgr;
Expand All @@ -185,7 +185,7 @@ nuraft::cmd_result_code ManagerImpl::group_init(int32_t const srv_id, group_id_t
if (it != _state_managers.end()) {
if (happened) {
// A new logstore!
LOGDEBUGMOD(nuraft_mesg, "Creating new State Manager for: [group={}], type: {}", group_id, group_type);
LOGD("Creating new State Manager for: [group={}], type: {}", group_id, group_type);
it->second = application_.lock()->create_state_mgr(srv_id, group_id);
}
it->second->become_ready();
Expand Down Expand Up @@ -327,11 +327,11 @@ NullAsyncResult ManagerImpl::become_leader(group_id_t const& group_id) {
}

void ManagerImpl::leave_group(group_id_t const& group_id) {
LOGINFO("Leaving group [group={}]", group_id);
LOGI("Leaving group [group={}]", group_id);
{
std::lock_guard< std::mutex > lg(_manager_lock);
if (0 == _state_managers.count(group_id)) {
LOGDEBUGMOD(nuraft_mesg, "Asked to leave [group={}] which we are not part of!", group_id);
LOGD("Asked to leave [group={}] which we are not part of!", group_id);
return;
}
}
Expand All @@ -345,7 +345,7 @@ void ManagerImpl::leave_group(group_id_t const& group_id) {
_state_managers.erase(it);
}

LOGINFO("Finished leaving: [group={}]", group_id);
LOGI("Finished leaving: [group={}]", group_id);
}

uint32_t ManagerImpl::logstore_id(group_id_t const& group_id) const {
Expand All @@ -354,9 +354,9 @@ uint32_t ManagerImpl::logstore_id(group_id_t const& group_id) const {
return UINT32_MAX;
}

void ManagerImpl::get_srv_config_all(group_id_t const& group_name,
void ManagerImpl::get_srv_config_all(group_id_t const& group_id,
std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) {
_mesg_service->get_srv_config_all(group_name, configs_out);
_mesg_service->get_srv_config_all(group_id, configs_out);
}

bool ManagerImpl::bind_data_service_request(std::string const& request_name, group_id_t const& group_id,
Expand Down
Loading

0 comments on commit d1378de

Please sign in to comment.