diff --git a/src/include/nuraft_mesg/mesg_factory.hpp b/src/include/nuraft_mesg/mesg_factory.hpp index 7bb3839..c3fb876 100644 --- a/src/include/nuraft_mesg/mesg_factory.hpp +++ b/src/include/nuraft_mesg/mesg_factory.hpp @@ -50,7 +50,7 @@ class group_factory : public grpc_factory { class mesg_factory final : public grpc_factory { std::shared_ptr< group_factory > _group_factory; - group_id_t const _group_name; + group_id_t const _group_id; group_type_t const _group_type; std::shared_ptr< sisl::MetricsGroupWrapper > _metrics; @@ -59,11 +59,11 @@ class mesg_factory final : public grpc_factory { std::shared_ptr< sisl::MetricsGroupWrapper > metrics = nullptr) : grpc_factory(0, to_string(grp_id)), _group_factory(g_factory), - _group_name(grp_id), + _group_id(grp_id), _group_type(grp_type), _metrics(metrics) {} - group_id_t group_name() const { return _group_name; } + group_id_t group_id() const { return _group_id; } nuraft::cmd_result_code create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >& rpc_ptr) override; diff --git a/src/lib/common_lib.hpp b/src/lib/common_lib.hpp new file mode 100644 index 0000000..b855b0d --- /dev/null +++ b/src/lib/common_lib.hpp @@ -0,0 +1,12 @@ +#pragma once + +#include + +#include "nuraft_mesg/common.hpp" + +#define LOGT(...) LOGTRACEMOD(nuraft_mesg, ##__VA_ARGS__) +#define LOGD(...) LOGDEBUGMOD(nuraft_mesg, ##__VA_ARGS__) +#define LOGI(...) LOGINFOMOD(nuraft_mesg, ##__VA_ARGS__) +#define LOGW(...) LOGWARNMOD(nuraft_mesg, ##__VA_ARGS__) +#define LOGE(...) LOGERRORMOD(nuraft_mesg, ##__VA_ARGS__) +#define LOGC(...) LOGCRITICALMOD(nuraft_mesg, ##__VA_ARGS__) diff --git a/src/lib/data_service_grpc.cpp b/src/lib/data_service_grpc.cpp index ef39d6c..e57d22a 100644 --- a/src/lib/data_service_grpc.cpp +++ b/src/lib/data_service_grpc.cpp @@ -27,7 +27,7 @@ bool data_service_grpc::bind(std::string const& request_name, group_id_t const& data_service_request_handler_t const& request_cb) { RELEASE_ASSERT(_grpc_server, "NULL _grpc_server!"); if (!request_cb) { - LOGWARNMOD(nuraft_mesg, "request_cb null for the request {}, cannot bind.", request_name); + LOGW("request_cb null for the request {}, cannot bind.", request_name); return false; } // This is an async call, hence the "return false". The user should invoke rpc_data->send_response to finish the @@ -35,7 +35,7 @@ bool data_service_grpc::bind(std::string const& request_name, group_id_t const& auto generic_handler_cb = [request_cb](boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) { sisl::io_blob svr_buf; if (auto status = deserialize_from_byte_buffer(rpc_data->request(), svr_buf); !status.ok()) { - LOGERRORMOD(nuraft_mesg, "ByteBuffer DumpToSingleSlice failed, {}", status.error_message()); + LOGE(, "ByteBuffer DumpToSingleSlice failed, {}", status.error_message()); rpc_data->set_status(status); return true; // respond immediately } @@ -50,7 +50,7 @@ bool data_service_grpc::bind(std::string const& request_name, group_id_t const& throw std::runtime_error(fmt::format("Could not register generic rpc {} with gRPC!", request_name)); } } else { - LOGWARNMOD(nuraft_mesg, "data service rpc {} exists", it->first); + LOGW(, "data service rpc {} exists", it->first); return false; } } else { diff --git a/src/lib/grpc_client.cpp b/src/lib/grpc_client.cpp index 8c8dfe8..925fbc2 100644 --- a/src/lib/grpc_client.cpp +++ b/src/lib/grpc_client.cpp @@ -54,7 +54,7 @@ inline std::shared_ptr< nuraft::resp_msg > toResponse(RaftMessage const& raft_ms resp.next_index(), resp.accepted()); message->set_result_code((nuraft::cmd_result_code)(0 - resp.result_code())); if (nuraft::cmd_result_code::NOT_LEADER == message->get_result_code()) { - LOGINFOMOD(nuraft_mesg, "Leader has changed!"); + LOGI("Leader has changed!"); message->dest_addr = resp.dest_addr(); } if (0 < resp.context().length()) { @@ -67,16 +67,14 @@ inline std::shared_ptr< nuraft::resp_msg > toResponse(RaftMessage const& raft_ms std::atomic_uint64_t grpc_base_client::_client_counter = 0ul; -void grpc_base_client::send(std::shared_ptr< nuraft::req_msg >& req, nuraft::rpc_handler& complete, - uint64_t) { +void grpc_base_client::send(std::shared_ptr< nuraft::req_msg >& req, nuraft::rpc_handler& complete, uint64_t) { assert(req && complete); RaftMessage grpc_request; grpc_request.set_allocated_base(fromBaseRequest(*req)); grpc_request.set_allocated_rc_request(fromRCRequest(*req)); - LOGTRACEMOD(nuraft_mesg, "Sending [{}] from: [{}] to: [{}]", - nuraft::msg_type_to_string(nuraft::msg_type(grpc_request.base().type())), grpc_request.base().src(), - grpc_request.base().dest()); + LOGT("Sending [{}] from: [{}] to: [{}]", nuraft::msg_type_to_string(nuraft::msg_type(grpc_request.base().type())), + grpc_request.base().src(), grpc_request.base().dest()); send(grpc_request, [req, complete](RaftMessage& response, ::grpc::Status& status) mutable -> void { std::shared_ptr< nuraft::rpc_exception > err; diff --git a/src/lib/grpc_client.hpp b/src/lib/grpc_client.hpp index 2c996ab..bb6ddfa 100644 --- a/src/lib/grpc_client.hpp +++ b/src/lib/grpc_client.hpp @@ -23,7 +23,7 @@ #include #include -#include "nuraft_mesg/common.hpp" +#include "common_lib.hpp" namespace nuraft_mesg { @@ -76,13 +76,13 @@ class grpc_client : public grpc_base_client, public sisl::GrpcAsyncClient { void init() override { // Re-create channel only if current channel is busted. - if (!_stub || !is_connection_ready()) { - LOGDEBUGMOD(nuraft_mesg, "Client init ({}) to {}", (!!_stub ? "Again" : "First"), _addr); - sisl::GrpcAsyncClient::init(); - _stub = sisl::GrpcAsyncClient::make_stub< TSERVICE >(_worker_name); - } else { - LOGDEBUGMOD(nuraft_mesg, "Channel looks fine, re-using"); + if (_stub && is_connection_ready()) { + LOGD("Channel looks fine, re-using"); + return; } + LOGD("Client init ({}) to {}", (!!_stub ? "Again" : "First"), _addr); + sisl::GrpcAsyncClient::init(); + _stub = sisl::GrpcAsyncClient::make_stub< TSERVICE >(_worker_name); } protected: diff --git a/src/lib/grpc_factory.cpp b/src/lib/grpc_factory.cpp index 0b578e0..ffbeffa 100644 --- a/src/lib/grpc_factory.cpp +++ b/src/lib/grpc_factory.cpp @@ -93,27 +93,26 @@ void respHandler(std::shared_ptr< ContextType > ctx, std::shared_ptr< nuraft::re std::shared_ptr< nuraft::rpc_exception >& err) { auto factory = ctx->cli_factory(); if (err || !rsp) { - LOGERROR("{}", (err ? err->what() : "No response.")); + LOGE("{}", (err ? err->what() : "No response.")); ctx->set((rsp ? rsp->get_result_code() : nuraft::cmd_result_code::SERVER_NOT_FOUND)); return; } else if (rsp->get_accepted()) { - LOGDEBUGMOD(nuraft_mesg, "Accepted response"); + LOGD("Accepted response"); ctx->set(rsp->get_result_code()); return; } else if (ctx->_cur_dest == rsp->get_dst()) { - LOGWARN("Request ignored"); + LOGW("Request ignored"); ctx->set(rsp->get_result_code()); return; } else if (0 > rsp->get_dst()) { - LOGWARN("No known leader!"); + LOGW("No known leader!"); ctx->set(rsp->get_result_code()); return; } // Not accepted: means that `get_dst()` is a new leader. auto gresp = std::dynamic_pointer_cast< grpc_resp >(rsp); - LOGDEBUGMOD(nuraft_mesg, "Updating destination from {} to {}[{}]", ctx->_cur_dest, rsp->get_dst(), - gresp->dest_addr); + LOGD("Updating destination from {} to {}[{}]", ctx->_cur_dest, rsp->get_dst(), gresp->dest_addr); ctx->_cur_dest = rsp->get_dst(); auto client = factory->create_client(gresp->dest_addr); @@ -123,7 +122,7 @@ void respHandler(std::shared_ptr< ContextType > ctx, std::shared_ptr< nuraft::re respHandler(ctx, rsp, err); }); - LOGDEBUGMOD(nuraft_mesg, "Creating new message: {}", ctx->_new_srv_addr); + LOGD("Creating new message: {}", ctx->_new_srv_addr); auto msg = createMessage(ctx->payload(), ctx->_new_srv_addr); client->send(msg, handler); } @@ -144,7 +143,7 @@ class grpc_error_client : public grpc_base_client { nuraft::ptr< nuraft::rpc_client > grpc_factory::create_client(std::string const& client) { try { return create_client(boost::uuids::string_generator()(client)); - } catch (std::runtime_error const& e) { LOGCRITICAL("Client Endpoint Invalid! [{}]", client); } + } catch (std::runtime_error const& e) { LOGC("Client Endpoint Invalid! [{}]", client); } return nullptr; } @@ -155,17 +154,17 @@ nuraft::ptr< nuraft::rpc_client > grpc_factory::create_client(peer_id_t const& c auto [it, happened] = _clients.emplace(client, nullptr); if (_clients.end() != it) { if (!happened) { - LOGDEBUGMOD(nuraft_mesg, "Re-creating client for {}", client); + LOGD("Re-creating client for {}", client); if (auto err = reinit_client(client, it->second); nuraft::OK != err) { - LOGERROR("Failed to re-initialize client {}: {}", client, err); + LOGE("Failed to re-initialize client {}: {}", client, err); new_client = std::make_shared< grpc_error_client >(); } else { new_client = it->second; } } else { - LOGDEBUGMOD(nuraft_mesg, "Creating client for {}", client); + LOGD("Creating client for {}", client); if (auto err = create_client(client, it->second); nuraft::OK != err) { - LOGERROR("Failed to create client for {}: {}", client, err); + LOGE("Failed to create client for {}: {}", client, err); new_client = std::make_shared< grpc_error_client >(); } else { new_client = it->second; diff --git a/src/lib/grpc_server.cpp b/src/lib/grpc_server.cpp index 95b697d..d906da7 100644 --- a/src/lib/grpc_server.cpp +++ b/src/lib/grpc_server.cpp @@ -71,9 +71,8 @@ grpc_server::append_entries(std::vector< nuraft::ptr< nuraft::buffer > > const& } ::grpc::Status grpc_server::step(const RaftMessage& request, RaftMessage& reply) { - LOGTRACEMOD(nuraft_mesg, "Stepping [{}] from: [{}] to: [{}]", - nuraft::msg_type_to_string(nuraft::msg_type(request.base().type())), request.base().src(), - request.base().dest()); + LOGT("Stepping [{}] from: [{}] to: [{}]", nuraft::msg_type_to_string(nuraft::msg_type(request.base().type())), + request.base().src(), request.base().dest()); auto rcreq = toRequest(request); auto resp = nuraft::raft_server_handler::process_req(_raft_server.get(), *rcreq); if (!resp) { return ::grpc::Status(::grpc::StatusCode::CANCELLED, "Server rejected request"); } diff --git a/src/lib/logger.hpp b/src/lib/logger.hpp index 8eb0fad..a7d0966 100644 --- a/src/lib/logger.hpp +++ b/src/lib/logger.hpp @@ -17,7 +17,7 @@ #include #include -#include "nuraft_mesg/common.hpp" +#include "common_lib.hpp" class nuraft_mesg_logger : public ::nuraft::logger { nuraft_mesg::group_id_t const _group_id; @@ -29,31 +29,42 @@ class nuraft_mesg_logger : public ::nuraft::logger { ::nuraft::logger(), _group_id(group_id), _custom_logger(custom_logger) {} void set_level(int l) override { - LOGDEBUGMOD(nuraft_mesg, "Updating level to: {}", l); + LOGD("Updating level to: {}", l); SISL_LOG_LEVEL(nuraft, static_cast< spdlog::level::level_enum >(abs(l - 6))); } void put_details(int level, const char* source_file, const char* func_name, size_t line_number, const std::string& log_line) override { - auto const mesg = - fmt::format("[group={}] {}:{}#{} : {}", _group_id, file_name(source_file), func_name, line_number, log_line); switch (level) { - case 1: - [[fallthrough]]; + case 1: { + if (LEVELCHECK(nuraft_mesg, spdlog::level::level_enum::critical)) + sisl::logging::GetLogger()->critical("[{}:{}:{}] {} [group={}]", file_name(source_file), line_number, + func_name, log_line, _group_id); + } break; case 2: { - LOGERRORMOD(nuraft, "{}", mesg); + if (LEVELCHECK(nuraft_mesg, spdlog::level::level_enum::err)) + sisl::logging::GetLogger()->error("[{}:{}:{}] {} [group={}]", file_name(source_file), line_number, + func_name, log_line, _group_id); } break; case 3: { - LOGWARNMOD(nuraft, "{}", mesg); + if (LEVELCHECK(nuraft_mesg, spdlog::level::level_enum::warn)) + sisl::logging::GetLogger()->warn("[{}:{}:{}] {} [group={}]", file_name(source_file), line_number, + func_name, log_line, _group_id); } break; case 4: { - LOGINFOMOD_USING_LOGGER(nuraft, _custom_logger, "{}", mesg); + if (LEVELCHECK(nuraft_mesg, spdlog::level::level_enum::info)) + _custom_logger->info("[{}:{}:{}] {} [group={}]", file_name(source_file), line_number, func_name, + log_line, _group_id); } break; case 5: { - LOGDEBUGMOD_USING_LOGGER(nuraft, _custom_logger, "{}", mesg); + if (LEVELCHECK(nuraft_mesg, spdlog::level::level_enum::debug)) + _custom_logger->debug("[{}:{}:{}] {} [group={}]", file_name(source_file), line_number, func_name, + log_line, _group_id); } break; default: { - LOGTRACEMOD_USING_LOGGER(nuraft, _custom_logger, "{}", mesg); + if (LEVELCHECK(nuraft_mesg, spdlog::level::level_enum::trace)) + _custom_logger->trace("[{}:{}:{}] {} [group={}]", file_name(source_file), line_number, func_name, + log_line, _group_id); } break; } } diff --git a/src/lib/manager_impl.cpp b/src/lib/manager_impl.cpp index f36043c..79c2849 100644 --- a/src/lib/manager_impl.cpp +++ b/src/lib/manager_impl.cpp @@ -42,7 +42,7 @@ class engine_factory : public group_factory { application_(app) {} std::string lookupEndpoint(peer_id_t const& client) override { - LOGTRACEMOD(nuraft_mesg, "[peer={}]", client); + LOGT("[peer={}]", client); if (auto a = application_.lock(); a) return a->lookup_peer(client); return std::string(); } @@ -93,7 +93,7 @@ ManagerImpl::ManagerImpl(Manager::Params const& start_params, std::weak_ptr< Mes void ManagerImpl::restart_server() { auto listen_address = fmt::format(FMT_STRING("0.0.0.0:{}"), start_params_.mesg_port_); - LOGINFO("Starting Messaging Service on http://{}", listen_address); + LOGI("Starting Messaging Service on http://{}", listen_address); std::lock_guard< std::mutex > lg(_manager_lock); _grpc_server.reset(); @@ -111,31 +111,31 @@ void ManagerImpl::register_mgr_type(group_type_t const& group_type, group_params auto [it, happened] = _state_mgr_types.emplace(std::make_pair(group_type, params)); DEBUG_ASSERT(_state_mgr_types.end() != it, "Out of memory?"); DEBUG_ASSERT(!!happened, "Re-register?"); - if (_state_mgr_types.end() == it) { LOGERROR("Could not register group type: {}", group_type); } + if (_state_mgr_types.end() == it) { LOGE("Could not register [group_type={}]", group_type); } } nuraft::cb_func::ReturnCode ManagerImpl::callback_handler(group_id_t const& group_id, nuraft::cb_func::Type type, nuraft::cb_func::Param* param) { switch (type) { case nuraft::cb_func::RemovedFromCluster: { - LOGINFO("Removed from cluster [group={}]", group_id); + LOGI("Removed from cluster [group={}]", group_id); exit_group(group_id); } break; case nuraft::cb_func::JoinedCluster: { auto const my_id = param->myId; auto const leader_id = param->leaderId; - LOGINFO("Joined cluster: [group={}], [l_id:{},my_id:{}]", group_id, leader_id, my_id); + LOGI("Joined cluster: [group={}], [l_id:{},my_id:{}]", group_id, leader_id, my_id); { std::lock_guard< std::mutex > lg(_manager_lock); _is_leader[group_id] = (leader_id == my_id); } } break; case nuraft::cb_func::NewConfig: { - LOGDEBUGMOD(nuraft_mesg, "Cluster change for: [group={}]", group_id); + LOGD("Cluster change for: [group={}]", group_id); _config_change.notify_all(); } break; case nuraft::cb_func::BecomeLeader: { - LOGDEBUGMOD(nuraft_mesg, "I'm the leader of: [group={}]!", group_id); + LOGD("I'm the leader of: [group={}]!", group_id); { std::lock_guard< std::mutex > lg(_manager_lock); _is_leader[group_id] = true; @@ -143,7 +143,7 @@ nuraft::cb_func::ReturnCode ManagerImpl::callback_handler(group_id_t const& grou _config_change.notify_all(); } break; case nuraft::cb_func::BecomeFollower: { - LOGINFOMOD(nuraft_mesg, "I'm a follower of: [group={}]!", group_id); + LOGI("I'm a follower of: [group={}]!", group_id); { std::lock_guard< std::mutex > lg(_manager_lock); _is_leader[group_id] = false; @@ -167,7 +167,7 @@ void ManagerImpl::exit_group(group_id_t const& group_id) { nuraft::cmd_result_code ManagerImpl::group_init(int32_t const srv_id, group_id_t const& group_id, group_type_t const& group_type, nuraft::context*& ctx, std::shared_ptr< nuraft_mesg::group_metrics > metrics) { - LOGDEBUGMOD(nuraft_mesg, "Creating context for: [group_id={}] as Member: {}", group_id, srv_id); + LOGD("Creating context for: [group_id={}] as Member: {}", group_id, srv_id); // State manager (RAFT log store, config) std::shared_ptr< nuraft::state_mgr > smgr; @@ -185,7 +185,7 @@ nuraft::cmd_result_code ManagerImpl::group_init(int32_t const srv_id, group_id_t if (it != _state_managers.end()) { if (happened) { // A new logstore! - LOGDEBUGMOD(nuraft_mesg, "Creating new State Manager for: [group={}], type: {}", group_id, group_type); + LOGD("Creating new State Manager for: [group={}], type: {}", group_id, group_type); it->second = application_.lock()->create_state_mgr(srv_id, group_id); } it->second->become_ready(); @@ -327,11 +327,11 @@ NullAsyncResult ManagerImpl::become_leader(group_id_t const& group_id) { } void ManagerImpl::leave_group(group_id_t const& group_id) { - LOGINFO("Leaving group [group={}]", group_id); + LOGI("Leaving group [group={}]", group_id); { std::lock_guard< std::mutex > lg(_manager_lock); if (0 == _state_managers.count(group_id)) { - LOGDEBUGMOD(nuraft_mesg, "Asked to leave [group={}] which we are not part of!", group_id); + LOGD("Asked to leave [group={}] which we are not part of!", group_id); return; } } @@ -345,7 +345,7 @@ void ManagerImpl::leave_group(group_id_t const& group_id) { _state_managers.erase(it); } - LOGINFO("Finished leaving: [group={}]", group_id); + LOGI("Finished leaving: [group={}]", group_id); } uint32_t ManagerImpl::logstore_id(group_id_t const& group_id) const { @@ -354,9 +354,9 @@ uint32_t ManagerImpl::logstore_id(group_id_t const& group_id) const { return UINT32_MAX; } -void ManagerImpl::get_srv_config_all(group_id_t const& group_name, +void ManagerImpl::get_srv_config_all(group_id_t const& group_id, std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) { - _mesg_service->get_srv_config_all(group_name, configs_out); + _mesg_service->get_srv_config_all(group_id, configs_out); } bool ManagerImpl::bind_data_service_request(std::string const& request_name, group_id_t const& group_id, diff --git a/src/lib/manager_impl.hpp b/src/lib/manager_impl.hpp index 4c721de..398a680 100644 --- a/src/lib/manager_impl.hpp +++ b/src/lib/manager_impl.hpp @@ -24,6 +24,7 @@ #include #include "nuraft_mesg/mesg_factory.hpp" +#include "common_lib.hpp" namespace sisl { class GrpcServer; @@ -89,7 +90,7 @@ class ManagerImpl : public Manager { bool bind_data_service_request(std::string const& request_name, group_id_t const& group_id, data_service_request_handler_t const& request_handler) override; - void get_srv_config_all(group_id_t const& group_name, + void get_srv_config_all(group_id_t const& group_id, std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) override; }; diff --git a/src/lib/mesg_client.cpp b/src/lib/mesg_client.cpp index 689a17c..c9d4bd1 100644 --- a/src/lib/mesg_client.cpp +++ b/src/lib/mesg_client.cpp @@ -49,13 +49,13 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha if (::grpc::INVALID_ARGUMENT == status.error_code()) { if (auto mc = weak_this.lock(); mc) { mc->bad_service.fetch_add(1, std::memory_order_relaxed); - LOGERRORMOD( - nuraft_mesg, + LOGE( + "Sent message to wrong service, need to disconnect! Error Message: [{}] Client IP: [{}]", status.error_message(), mc->_addr); } else { - LOGERRORMOD(nuraft_mesg, "Sent message to wrong service, need to disconnect! Error Message: [{}]", - status.error_message()); + LOGE("Sent message to wrong service, need to disconnect! Error Message: [{}]", + status.error_message()); } } complete(*response.mutable_msg(), status); @@ -76,7 +76,7 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha cli_byte_buf, request_name, [c = copyable_promise](grpc::ByteBuffer& resp, ::grpc::Status& status) mutable { if (!status.ok()) { - LOGERRORMOD(nuraft_mesg, "Failed to send data_service_request, error: {}", status.error_message()); + LOGE("Failed to send data_service_request, error: {}", status.error_message()); c->setValue(folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED)); } else { sisl::io_blob svr_buf; @@ -95,7 +95,7 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha class group_client : public grpc_base_client { std::shared_ptr< messaging_client > _client; - group_id_t const _group_name; + group_id_t const _group_id; group_type_t const _group_type; std::shared_ptr< group_metrics > _metrics; std::string const _client_addr; @@ -105,7 +105,7 @@ class group_client : public grpc_base_client { group_type_t const& grp_type, std::shared_ptr< sisl::MetricsGroupWrapper > metrics) : grpc_base_client(), _client(client), - _group_name(grp_name), + _group_id(grp_name), _group_type(grp_type), _metrics(std::static_pointer_cast< group_metrics >(metrics)), _client_addr(to_string(client_addr)) {} @@ -118,12 +118,12 @@ class group_client : public grpc_base_client { void send(RaftMessage const& message, handle_resp complete) override { RaftGroupMsg group_msg; - LOGTRACEMOD(nuraft_mesg, "Sending [{}] from: [{}] to: [{}] Group: [{}]", - nuraft::msg_type_to_string(nuraft::msg_type(message.base().type())), message.base().src(), - message.base().dest(), _group_name); + LOGT("Sending [{}] from: [{}] to: [{}] Group: [{}]", + nuraft::msg_type_to_string(nuraft::msg_type(message.base().type())), message.base().src(), + message.base().dest(), _group_id); if (_metrics) { COUNTER_INCREMENT(*_metrics, group_sends, 1); } group_msg.set_intended_addr(_client_addr); - group_msg.set_group_name(to_string(_group_name)); + group_msg.set_group_id(to_string(_group_id)); group_msg.set_group_type(_group_type); group_msg.mutable_msg()->CopyFrom(message); _client->send(group_msg, complete); @@ -137,16 +137,16 @@ class group_client : public grpc_base_client { nuraft::cmd_result_code mesg_factory::create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >& raft_client) { // Re-direct this call to a global factory so we can re-use clients to the same endpoints - LOGDEBUGMOD(nuraft_mesg, "Creating client to {}", client); + LOGD("Creating client to {}", client); auto m_client = std::dynamic_pointer_cast< messaging_client >(_group_factory->create_client(to_string(client))); if (!m_client) return nuraft::CANCELLED; - raft_client = std::make_shared< group_client >(m_client, client, _group_name, _group_type, _metrics); + raft_client = std::make_shared< group_client >(m_client, client, _group_id, _group_type, _metrics); return (!raft_client) ? nuraft::BAD_REQUEST : nuraft::OK; } nuraft::cmd_result_code mesg_factory::reinit_client(peer_id_t const& client, std::shared_ptr< nuraft::rpc_client >& raft_client) { - LOGDEBUGMOD(nuraft_mesg, "Re-init client to {}", client); + LOGD("Re-init client to {}", client); auto g_client = std::dynamic_pointer_cast< group_client >(raft_client); auto new_raft_client = std::static_pointer_cast< nuraft::rpc_client >(g_client->realClient()); if (auto err = _group_factory->reinit_client(client, new_raft_client); err) { return err; } @@ -160,7 +160,7 @@ AsyncResult< sisl::io_blob > mesg_factory::data_service_request(std::string cons auto calls = std::list< AsyncResult< sisl::io_blob > >(); for (auto& nuraft_client : _clients) { auto g_client = std::dynamic_pointer_cast< nuraft_mesg::group_client >(nuraft_client.second); - calls.push_back(g_client->data_service_request(get_generic_method_name(request_name, _group_name), cli_buf)); + calls.push_back(g_client->data_service_request(get_generic_method_name(request_name, _group_id), cli_buf)); } return folly::collectAnyWithoutException(calls).deferValue([](auto&& p) { return p.second; }); } @@ -173,11 +173,11 @@ group_factory::group_factory(int const cli_thread_count, group_id_t const& name, nuraft::cmd_result_code group_factory::create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >& raft_client) { - LOGDEBUGMOD(nuraft_mesg, "Creating client to {}", client); + LOGD("Creating client to {}", client); auto endpoint = lookupEndpoint(client); if (endpoint.empty()) return nuraft::BAD_REQUEST; - LOGDEBUGMOD(nuraft_mesg, "Creating client for [{}] @ [{}]", client, endpoint); + LOGD("Creating client for [{}] @ [{}]", client, endpoint); raft_client = sisl::GrpcAsyncClient::make< messaging_client >(workerName(), endpoint, m_token_client, "", m_ssl_cert); return (!raft_client) ? nuraft::CANCELLED : nuraft::OK; @@ -185,7 +185,7 @@ nuraft::cmd_result_code group_factory::create_client(peer_id_t const& client, nuraft::cmd_result_code group_factory::reinit_client(peer_id_t const& client, std::shared_ptr< nuraft::rpc_client >& raft_client) { - LOGDEBUGMOD(nuraft_mesg, "Re-init client to {}", client); + LOGD("Re-init client to {}", client); assert(raft_client); auto mesg_client = std::dynamic_pointer_cast< messaging_client >(raft_client); if (!mesg_client->is_connection_ready() || 0 < mesg_client->bad_service.load(std::memory_order_relaxed)) { diff --git a/src/lib/service.cpp b/src/lib/service.cpp index 55fa95e..2763ede 100644 --- a/src/lib/service.cpp +++ b/src/lib/service.cpp @@ -30,16 +30,14 @@ SISL_OPTION_GROUP(nuraft_mesg, p->setValue(folly::Unit()); \ }); \ return std::move(sf); \ - } catch (std::runtime_error & rte) { \ - LOGERRORMOD(nuraft_mesg, "Caught exception: [group={}] {}", group_name, rte.what()); \ - } + } catch (std::runtime_error & rte) { LOGE("Caught exception: [group={}] {}", group_id, rte.what()); } namespace nuraft_mesg { using AsyncRaftSvc = Messaging::AsyncService; -grpc_server_wrapper::grpc_server_wrapper(group_id_t const& group_name) { - if (0 < SISL_OPTIONS.count("msg_metrics")) m_metrics = std::make_shared< group_metrics >(group_name); +grpc_server_wrapper::grpc_server_wrapper(group_id_t const& group_id) { + if (0 < SISL_OPTIONS.count("msg_metrics")) m_metrics = std::make_shared< group_metrics >(group_id); } msg_service::msg_service(get_server_ctx_cb get_server_ctx, group_id_t const& service_address, @@ -62,7 +60,7 @@ msg_service::~msg_service() { void msg_service::associate(::sisl::GrpcServer* server) { RELEASE_ASSERT(server, "NULL server!"); if (!server->register_async_service< Messaging >()) { - LOGERRORMOD(nuraft_mesg, "Could not register RaftSvc with gRPC!"); + LOGE("Could not register RaftSvc with gRPC!"); abort(); } if (_data_service_enabled) { @@ -76,7 +74,7 @@ void msg_service::bind(::sisl::GrpcServer* server) { if (!server->register_rpc< Messaging, RaftGroupMsg, RaftGroupMsg, false >( "RaftStep", &AsyncRaftSvc::RequestRaftStep, std::bind(&msg_service::raftStep, this, std::placeholders::_1))) { - LOGERRORMOD(nuraft_mesg, "Could not bind gRPC ::RaftStep to routine!"); + LOGE("Could not bind gRPC ::RaftStep to routine!"); abort(); } if (_data_service_enabled) { _data_service.bind(); } @@ -85,72 +83,68 @@ void msg_service::bind(::sisl::GrpcServer* server) { bool msg_service::bind_data_service_request(std::string const& request_name, group_id_t const& group_id, data_service_request_handler_t const& request_handler) { if (!_data_service_enabled) { - LOGERRORMOD(nuraft_mesg, "Could not register data service method {}; data service is null", request_name); + LOGE("Could not register data service method {}; data service is null", request_name); return false; } return _data_service.bind(request_name, group_id, request_handler); } -NullAsyncResult msg_service::add_srv(group_id_t const& group_name, nuraft::srv_config const& cfg) { +NullAsyncResult msg_service::add_srv(group_id_t const& group_id, nuraft::srv_config const& cfg) { std::shared_ptr< grpc_server > server; { std::shared_lock< lock_type > rl(_raft_servers_lock); - if (auto it = _raft_servers.find(group_name); _raft_servers.end() != it) { server = it->second.m_server; } + if (auto it = _raft_servers.find(group_id); _raft_servers.end() != it) { server = it->second.m_server; } } if (server) { CONTINUE_RESP(server->add_srv(cfg)) } return folly::makeUnexpected(nuraft::SERVER_NOT_FOUND); } -NullAsyncResult msg_service::rm_srv(group_id_t const& group_name, int const member_id) { +NullAsyncResult msg_service::rm_srv(group_id_t const& group_id, int const member_id) { std::shared_ptr< grpc_server > server; { std::shared_lock< lock_type > rl(_raft_servers_lock); - if (auto it = _raft_servers.find(group_name); _raft_servers.end() != it) { server = it->second.m_server; } + if (auto it = _raft_servers.find(group_id); _raft_servers.end() != it) { server = it->second.m_server; } } if (server) { CONTINUE_RESP(server->rem_srv(member_id)) } return folly::makeUnexpected(nuraft::SERVER_NOT_FOUND); } -bool msg_service::request_leadership(group_id_t const& group_name) { +bool msg_service::request_leadership(group_id_t const& group_id) { std::shared_ptr< grpc_server > server; { std::shared_lock< lock_type > rl(_raft_servers_lock); - if (auto it = _raft_servers.find(group_name); _raft_servers.end() != it) { server = it->second.m_server; } + if (auto it = _raft_servers.find(group_id); _raft_servers.end() != it) { server = it->second.m_server; } } if (server) { try { return server->request_leadership(); - } catch (std::runtime_error& rte) { - LOGERRORMOD(nuraft_mesg, "Caught exception during request_leadership(): {}", rte.what()) - } + } catch (std::runtime_error& rte) { LOGE("Caught exception during request_leadership(): {}", rte.what()) } } return false; } -void msg_service::get_srv_config_all(group_id_t const& group_name, +void msg_service::get_srv_config_all(group_id_t const& group_id, std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out) { std::shared_ptr< grpc_server > server; { std::shared_lock< lock_type > rl(_raft_servers_lock); - if (auto it = _raft_servers.find(group_name); _raft_servers.end() != it) { server = it->second.m_server; } + if (auto it = _raft_servers.find(group_id); _raft_servers.end() != it) { server = it->second.m_server; } } if (server) { try { server->get_srv_config_all(configs_out); return; - } catch (std::runtime_error& rte) { - LOGERRORMOD(nuraft_mesg, "Caught exception during add_srv(): {}", rte.what()); - } + } catch (std::runtime_error& rte) { LOGE("Caught exception during add_srv(): {}", rte.what()); } } } -NullAsyncResult msg_service::append_entries(group_id_t const& group_name, +NullAsyncResult msg_service::append_entries(group_id_t const& group_id, std::vector< nuraft::ptr< nuraft::buffer > > const& logs) { std::shared_ptr< grpc_server > server; { std::shared_lock< lock_type > rl(_raft_servers_lock); - if (auto it = _raft_servers.find(group_name); _raft_servers.end() != it) { server = it->second.m_server; } + if (auto it = _raft_servers.find(group_id); _raft_servers.end() != it) { server = it->second.m_server; } } if (server) { CONTINUE_RESP(server->append_entries(logs)) } return folly::makeUnexpected(nuraft::SERVER_NOT_FOUND); @@ -164,34 +158,34 @@ void msg_service::setDefaultGroupType(std::string const& _type) { bool msg_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg, RaftGroupMsg >& rpc_data) { auto& request = rpc_data->request(); auto& response = rpc_data->response(); - auto const& group_name = request.group_name(); + auto const& group_id = request.group_id(); auto const& intended_addr = request.intended_addr(); auto gid = boost::uuids::uuid(); auto sid = boost::uuids::uuid(); try { - gid = boost::uuids::string_generator()(group_name); + gid = boost::uuids::string_generator()(group_id); sid = boost::uuids::string_generator()(intended_addr); } catch (std::runtime_error const& e) { - LOGWARNMOD(nuraft_mesg, "Recieved mesg for {}:{} which is not a valid UUID!", group_name, intended_addr); + LOGW("Recieved mesg for [group={}] [addr={}] which is not a valid UUID!", group_id, intended_addr); rpc_data->set_status( - ::grpc::Status(::grpc::INVALID_ARGUMENT, fmt::format(FMT_STRING("Bad GroupID {}"), group_name))); + ::grpc::Status(::grpc::INVALID_ARGUMENT, fmt::format(FMT_STRING("Bad GroupID {}"), group_id))); return true; } // Verify this is for the service it was intended for auto const& base = request.msg().base(); if (sid != _service_address) { - LOGWARNMOD(nuraft_mesg, "Recieved mesg for {} intended for {}, we are {}", - nuraft::msg_type_to_string(nuraft::msg_type(base.type())), intended_addr, _service_address); + LOGW("Recieved mesg for {} intended for {}, we are {}", + nuraft::msg_type_to_string(nuraft::msg_type(base.type())), intended_addr, _service_address); rpc_data->set_status(::grpc::Status( ::grpc::INVALID_ARGUMENT, fmt::format(FMT_STRING("intended addr: [{}], our addr: [{}]"), intended_addr, _service_address))); return true; } - LOGTRACEMOD(nuraft_mesg, "Received [{}] from: [{}] to: [{}] Group: [{}]", - nuraft::msg_type_to_string(nuraft::msg_type(base.type())), base.src(), base.dest(), group_name); + LOGT("Received [{}] from: [{}] to: [{}] Group: [{}]", nuraft::msg_type_to_string(nuraft::msg_type(base.type())), + base.src(), base.dest(), group_id); // JoinClusterRequests are expected to be received upon Cluster creation by the current leader. We need // to initialize a RaftServer context based on the corresponding type prior to servicing this request. This @@ -211,7 +205,7 @@ bool msg_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg, // Setup our response and process the request. Group types are able to register a Callback that expects a Nullary // to process the requests and send back possibly asynchronous responses in a seperate context. This can be used // to offload the Raft append operations onto a seperate thread group. - response.set_group_name(group_name); + response.set_group_id(group_id); if (server) { /// TODO replace this ugly hack // if (auto offload = _get_process_offload(request.group_type()); nullptr != offload) { @@ -226,13 +220,11 @@ bool msg_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg, try { rpc_data->set_status(server->step(request.msg(), *response.mutable_msg())); return true; - } catch (std::runtime_error& rte) { - LOGERRORMOD(nuraft_mesg, "Caught exception during step(): {}", rte.what()); - } + } catch (std::runtime_error& rte) { LOGE("Caught exception during step(): {}", rte.what()); } } else { - LOGDEBUGMOD(nuraft_mesg, "Missing RAFT group: {}", group_name); + LOGD("Missing [group={}]", group_id); } - rpc_data->set_status(::grpc::Status(::grpc::NOT_FOUND, fmt::format("Missing RAFT group {}", group_name))); + rpc_data->set_status(::grpc::Status(::grpc::NOT_FOUND, fmt::format("Missing RAFT group {}", group_id))); return true; } @@ -254,30 +246,28 @@ class msg_group_listner : public nuraft::rpc_listener { msg_group_listner(std::shared_ptr< msg_service > svc, group_id_t const& group) : _svc(svc), _group(group) {} ~msg_group_listner() { _svc->shutdown_for(_group); } - void listen(nuraft::ptr< nuraft::msg_handler >&) override { - LOGINFOMOD(nuraft_mesg, "Begin listening on {}", _group); - } - void stop() override { LOGINFOMOD(nuraft_mesg, "Stop {}", _group); } - void shutdown() override { LOGINFOMOD(nuraft_mesg, "Shutdown {}", _group); } + void listen(nuraft::ptr< nuraft::msg_handler >&) override { LOGI("[group={}]", _group); } + void stop() override { LOGI("[group={}]", _group); } + void shutdown() override { LOGI("[group={}]", _group); } }; -void msg_service::shutdown_for(group_id_t const& group_name) { +void msg_service::shutdown_for(group_id_t const& group_id) { { std::unique_lock< lock_type > lck(_raft_servers_lock); - LOGDEBUGMOD(nuraft_mesg, "Shutting down RAFT group: {}", group_name); - if (auto it = _raft_servers.find(group_name); _raft_servers.end() != it) { + LOGD("Shutting down [group={}]", group_id); + if (auto it = _raft_servers.find(group_id); _raft_servers.end() != it) { _raft_servers.erase(it); } else { - LOGWARNMOD(nuraft_mesg, "Unknown RAFT group: {} cannot shutdown.", group_name); + LOGW("Unknown [group={}] cannot shutdown.", group_id); return; } } _raft_servers_sync.notify_all(); } -nuraft::cmd_result_code msg_service::joinRaftGroup(int32_t const srv_id, group_id_t const& group_name, +nuraft::cmd_result_code msg_service::joinRaftGroup(int32_t const srv_id, group_id_t const& group_id, group_type_t const& group_type) { - LOGINFOMOD(nuraft_mesg, "Joining RAFT group: {}, type: {}", group_name, group_type); + LOGI("Joining RAFT [group={}], type: {}", group_id, group_type); nuraft::context* ctx{nullptr}; bool happened{false}; @@ -287,14 +277,14 @@ nuraft::cmd_result_code msg_service::joinRaftGroup(int32_t const srv_id, group_i auto g_type = group_type; std::unique_lock< lock_type > lck(_raft_servers_lock); if (g_type.empty()) { g_type = _default_group_type; } - std::tie(it, happened) = _raft_servers.emplace(std::make_pair(group_name, group_name)); + std::tie(it, happened) = _raft_servers.emplace(std::make_pair(group_id, group_id)); if (_raft_servers.end() != it && happened) { - if (auto err = _get_server_ctx(srv_id, group_name, g_type, ctx, it->second.m_metrics); err) { - LOGERRORMOD(nuraft_mesg, "Error during RAFT server creation on group {}: {}", group_name, err); + if (auto err = _get_server_ctx(srv_id, group_id, g_type, ctx, it->second.m_metrics); err) { + LOGE("Error during RAFT server creation [group={}]: {}", group_id, err); return err; } DEBUG_ASSERT(!ctx->rpc_listener_, "RPC listner should not be set!"); - auto new_listner = std::make_shared< msg_group_listner >(shared_from_this(), group_name); + auto new_listner = std::make_shared< msg_group_listner >(shared_from_this(), group_id); ctx->rpc_listener_ = std::static_pointer_cast< nuraft::rpc_listener >(new_listner); auto server = std::make_shared< nuraft::raft_server >(ctx); it->second.m_server = std::make_shared< null_service >(server); @@ -308,28 +298,28 @@ nuraft::cmd_result_code msg_service::joinRaftGroup(int32_t const srv_id, group_i return nuraft::cmd_result_code::OK; } -void msg_service::partRaftGroup(group_id_t const& group_name) { +void msg_service::partRaftGroup(group_id_t const& group_id) { std::shared_ptr< grpc_server > server; { std::unique_lock< lock_type > lck(_raft_servers_lock); - if (auto it = _raft_servers.find(group_name); _raft_servers.end() != it) { + if (auto it = _raft_servers.find(group_id); _raft_servers.end() != it) { server = it->second.m_server; } else { - LOGWARNMOD(nuraft_mesg, "Unknown RAFT group: {} cannot part.", group_name); + LOGW("Unknown [group={}] cannot part.", group_id); return; } } if (auto raft_server = server->raft_server(); raft_server) { - LOGINFOMOD(nuraft_mesg, "Parting RAFT group: {}", group_name); + LOGI("[group={}]", group_id); raft_server->stop_server(); raft_server->shutdown(); } } void msg_service::shutdown() { - LOGINFOMOD(nuraft_mesg, "MessagingService shutdown started."); + LOGI("MessagingService shutdown started."); std::deque< std::shared_ptr< grpc_server > > servers; { @@ -348,7 +338,7 @@ void msg_service::shutdown() { std::unique_lock< lock_type > lck(_raft_servers_lock); _raft_servers_sync.wait(lck, [this]() { return _raft_servers.empty(); }); } - LOGINFOMOD(nuraft_mesg, "MessagingService shutdown complete."); + LOGI("MessagingService shutdown complete."); } } // namespace nuraft_mesg diff --git a/src/lib/service.hpp b/src/lib/service.hpp index 56180be..16830ed 100644 --- a/src/lib/service.hpp +++ b/src/lib/service.hpp @@ -30,8 +30,8 @@ using lock_type = folly::SharedMutex; class group_metrics : public sisl::MetricsGroupWrapper { public: - explicit group_metrics(group_id_t const& group_name) : - sisl::MetricsGroupWrapper("RAFTGroup", to_string(group_name).c_str()) { + explicit group_metrics(group_id_t const& group_id) : + sisl::MetricsGroupWrapper("RAFTGroup", to_string(group_id).c_str()) { REGISTER_COUNTER(group_steps, "Total group messages received", "raft_group", {"op", "step"}); REGISTER_COUNTER(group_sends, "Total group messages sent", "raft_group", {"op", "send"}); register_me_to_farm(); @@ -48,7 +48,7 @@ using get_server_ctx_cb = using data_service_t = data_service_grpc; struct grpc_server_wrapper { - explicit grpc_server_wrapper(group_id_t const& group_name); + explicit grpc_server_wrapper(group_id_t const& group_id); std::shared_ptr< grpc_server > m_server; std::shared_ptr< group_metrics > m_metrics; @@ -77,13 +77,13 @@ class msg_service : public std::enable_shared_from_this< msg_service > { void shutdown(); - NullAsyncResult add_srv(group_id_t const& group_name, nuraft::srv_config const& cfg); - NullAsyncResult append_entries(group_id_t const& group_name, + NullAsyncResult add_srv(group_id_t const& group_id, nuraft::srv_config const& cfg); + NullAsyncResult append_entries(group_id_t const& group_id, std::vector< nuraft::ptr< nuraft::buffer > > const& logs); - NullAsyncResult rm_srv(group_id_t const& group_name, int const member_id); + NullAsyncResult rm_srv(group_id_t const& group_id, int const member_id); - bool request_leadership(group_id_t const& group_name); - void get_srv_config_all(group_id_t const& group_name, + bool request_leadership(group_id_t const& group_id); + void get_srv_config_all(group_id_t const& group_id, std::vector< std::shared_ptr< nuraft::srv_config > >& configs_out); void associate(sisl::GrpcServer* server); @@ -96,14 +96,14 @@ class msg_service : public std::enable_shared_from_this< msg_service > { void setDefaultGroupType(std::string const& _type); - nuraft::cmd_result_code createRaftGroup(int const srv_id, group_id_t const& group_name, + nuraft::cmd_result_code createRaftGroup(int const srv_id, group_id_t const& group_id, group_type_t const& group_type) { - return joinRaftGroup(srv_id, group_name, group_type); + return joinRaftGroup(srv_id, group_id, group_type); } - nuraft::cmd_result_code joinRaftGroup(int32_t srv_id, group_id_t const& group_name, group_type_t const&); + nuraft::cmd_result_code joinRaftGroup(int32_t srv_id, group_id_t const& group_id, group_type_t const&); - void partRaftGroup(group_id_t const& group_name); + void partRaftGroup(group_id_t const& group_id); // Internal intent only void shutdown_for(group_id_t const&); diff --git a/src/lib/utils.hpp b/src/lib/utils.hpp index 72220a3..5601741 100644 --- a/src/lib/utils.hpp +++ b/src/lib/utils.hpp @@ -21,6 +21,8 @@ #include "nuraft_mesg/mesg_factory.hpp" #include "proto/raft_types.pb.h" +#include "common_lib.hpp" + namespace nuraft_mesg { inline RCMsgBase* fromBaseRequest(nuraft::msg_base const& rcbase) { diff --git a/src/proto/messaging_service.proto b/src/proto/messaging_service.proto index 17bfb7e..8795191 100644 --- a/src/proto/messaging_service.proto +++ b/src/proto/messaging_service.proto @@ -20,7 +20,7 @@ import "raft_types.proto"; package nuraft_mesg; message RaftGroupMsg { - string group_name = 1; + string group_id = 1; RaftMessage msg = 2; string intended_addr = 3; string group_type = 4; diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 0667003..e7b8416 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -18,6 +18,6 @@ if (jungle_FOUND) GTest::gmock ) - add_test(NAME MessagingTest COMMAND messaging_test -csv 3) + add_test(NAME MessagingTest COMMAND messaging_test -cv 0) set_property(TEST MessagingTest PROPERTY RUN_SERIAL 1) endif ()