Skip to content

Commit

Permalink
Compiles, UTs still to go.
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd committed Sep 28, 2023
1 parent 3533be0 commit 62b38e3
Show file tree
Hide file tree
Showing 12 changed files with 492 additions and 464 deletions.
1 change: 0 additions & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ AlignConsecutiveDeclarations: false
AlignEscapedNewlines: Right
AlignOperands: false
AlignTrailingComments: true
AllowShortBlocksOnASingleLine: true
AllowShortIfStatementsOnASingleLine: true
AllowShortBlocksOnASingleLine: true
AllowShortCaseLabelsOnASingleLine: false
Expand Down
44 changes: 44 additions & 0 deletions apply-clang-format.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/bin/bash

# Parse args
read -r -d '' USAGE << EOM
apply-clang-format.sh [-v]
-v validates formatting, returns exit 1 on formatting errors
EOM

while getopts "v" opt; do
case $opt in
v)
VALIDATE=true;;
*)
echo "$USAGE"
exit 1;;
\?)
echo "Invalid option: -$OPTARG" >&2
exit 1;;
:)
echo "Option $OPTARG requires an argument." >&2
exit 1
esac
done

find ./src -iname '*.h' -o -iname '*.cpp' -o -iname '*.hpp' -o -iname '*.cc' | xargs clang-format -style=file -i -fallback-style=none

if [ $VALIDATE ]; then
EXIT_CODE=0
PATCH_FILE="clang_format.patch"
git diff > $PATCH_FILE

# Delete if 0 size
if [ -s $PATCH_FILE ]
then
echo "Code is not according to clang-format-8. Run ./apply-clang-format.sh before committing"
clang-format --version
echo "How to install clang-format-8: https://jirap.corp.ebay.com/browse/MONSTOR-10256"
echo "#### Format Issue:"
cat $PATCH_FILE
EXIT_CODE=1
fi
rm $PATCH_FILE
exit $EXIT_CODE
fi
4 changes: 2 additions & 2 deletions src/include/messaging_if.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class repl_service_ctx {

// data service api client call
virtual AsyncResult< sisl::io_blob > data_service_request(std::string const& request_name,
io_blob_list_t const& cli_buf);
io_blob_list_t const& cli_buf) = 0;

// Send response to a data service request and finish the async call.
virtual void send_data_service_response(io_blob_list_t const& outgoing_buf,
Expand Down Expand Up @@ -122,7 +122,7 @@ class Manager {
virtual ~Manager() = default;

// Register a new group type
virtual void register_mgr_type(std::string const& group_type, group_params const&);
virtual void register_mgr_type(std::string const& group_type, group_params const&) = 0;

virtual std::shared_ptr< mesg_state_mgr > lookup_state_manager(std::string const& group_id) const = 0;
virtual NullAsyncResult create_group(std::string const& group_id, std::string const& group_type) = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/data_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace sisl {
struct io_blob;
class GenericRpcData;
}
} // namespace sisl
namespace nuraft_mesg {

using data_service_request_handler_t =
Expand Down
4 changes: 2 additions & 2 deletions src/lib/grpc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ static std::shared_ptr< nuraft::req_msg > toRequest(RaftMessage const& raft_msg)
for (auto const& log : req.log_entries()) {
auto log_buffer = nuraft::buffer::alloc(log.buffer().size());
memcpy(log_buffer->data(), log.buffer().data(), log.buffer().size());
log_entries.push_back(
std::make_shared< nuraft::log_entry >(log.term(), log_buffer, (nuraft::log_val_type)log.type(), log.timestamp()));
log_entries.push_back(std::make_shared< nuraft::log_entry >(log.term(), log_buffer,
(nuraft::log_val_type)log.type(), log.timestamp()));
}
return message;
}
Expand Down
265 changes: 140 additions & 125 deletions src/lib/messaging.cpp

Large diffs are not rendered by default.

12 changes: 5 additions & 7 deletions src/include/messaging.hpp → src/lib/messaging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ class msg_service;
class group_metrics;

class service : public Manager {
Manager::Params _start_params;
Manager::Params start_params_;
int32_t _srv_id;

std::map< std::string, Manager::group_params > _state_mgr_types;

std::weak_ptr< MessagingApplication > application_;
std::shared_ptr< group_factory > _g_factory;
std::shared_ptr< msg_service > _mesg_service;
std::unique_ptr<::sisl::GrpcServer > _grpc_server;
std::unique_ptr< ::sisl::GrpcServer > _grpc_server;

std::mutex mutable _manager_lock;
std::map< std::string, std::shared_ptr< mesg_state_mgr > > _state_managers;
Expand All @@ -61,7 +62,7 @@ class service : public Manager {
void exit_group(std::string const& group_id);

public:
service();
service(Manager::Params const&, std::weak_ptr< MessagingApplication >, bool and_data_svc = false);
~service() override;

int32_t server_id() const override { return _srv_id; }
Expand All @@ -74,13 +75,10 @@ class service : public Manager {
NullResult join_group(std::string const& group_id, std::string const& group_type,
std::shared_ptr< mesg_state_mgr > smgr) override;

void start(Manager::Params& start_params);

virtual NullAsyncResult add_member(std::string const& group_id, std::string const& server_id) override;
virtual NullAsyncResult rem_member(std::string const& group_id, std::string const& server_id) override;
virtual NullAsyncResult become_leader(std::string const& group_id) override;
virtual NullAsyncResult client_request(std::string const& group_id,
std::shared_ptr< nuraft::buffer >&) override;
virtual NullAsyncResult client_request(std::string const& group_id, std::shared_ptr< nuraft::buffer >&) override;

void leave_group(std::string const& group_id) override;
uint32_t logstore_id(std::string const& group_id) const override;
Expand Down
30 changes: 15 additions & 15 deletions src/lib/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,15 @@ grpc_server_wrapper::grpc_server_wrapper(group_name_t const& group_name) {
if (0 < SISL_OPTIONS.count("msg_metrics")) m_metrics = std::make_shared< group_metrics >(group_name);
}

msg_service::msg_service(get_server_ctx_cb get_server_ctx, process_offload_cb process_offload,
std::string const& service_address, bool const enable_data_service) :
msg_service::msg_service(get_server_ctx_cb get_server_ctx, std::string const& service_address,
bool const enable_data_service) :
_get_server_ctx(get_server_ctx),
_get_process_offload(process_offload),
_service_address(service_address),
_data_service_enabled(enable_data_service) {}

std::shared_ptr< msg_service > msg_service::create(get_server_ctx_cb get_server_ctx, process_offload_cb poc,
std::string const& service_address, bool const enable_data_service) {
return std::shared_ptr< msg_service >(new msg_service(get_server_ctx, poc, service_address, enable_data_service),
std::shared_ptr< msg_service > msg_service::create(get_server_ctx_cb get_server_ctx, std::string const& service_address,
bool const enable_data_service) {
return std::shared_ptr< msg_service >(new msg_service(get_server_ctx, service_address, enable_data_service),
[](msg_service* p) { delete p; });
}

Expand Down Expand Up @@ -208,15 +207,16 @@ bool msg_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMsg,
// to offload the Raft append operations onto a seperate thread group.
response.set_group_name(group_name);
if (server) {
if (auto offload = _get_process_offload(request.group_type()); nullptr != offload) {
offload([rpc_data, server]() {
auto& request = rpc_data->request();
auto& response = rpc_data->response();
rpc_data->set_status(server->step(request.msg(), *response.mutable_msg()));
rpc_data->send_response();
});
return false;
}
/// TODO replace this ugly hack
//if (auto offload = _get_process_offload(request.group_type()); nullptr != offload) {
// offload([rpc_data, server]() {
// auto& request = rpc_data->request();
// auto& response = rpc_data->response();
// rpc_data->set_status(server->step(request.msg(), *response.mutable_msg()));
// rpc_data->send_response();
// });
// return false;
//}
try {
rpc_data->set_status(server->step(request.msg(), *response.mutable_msg()));
return true;
Expand Down
11 changes: 3 additions & 8 deletions src/lib/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ using get_server_ctx_cb =
// pluggable type for data service
using data_service_t = data_service_grpc;

// A calback that returns the registered callback for for offloading RAFT request processing
using process_offload_cb = std::function< std::function< void(std::function< void() >) >(group_type_t const&) >;

struct grpc_server_wrapper {
explicit grpc_server_wrapper(group_name_t const& group_name);

Expand All @@ -61,7 +58,6 @@ struct grpc_server_wrapper {

class msg_service : public std::enable_shared_from_this< msg_service > {
get_server_ctx_cb _get_server_ctx;
process_offload_cb _get_process_offload;
std::mutex _raft_sync_lock;
std::condition_variable_any _raft_servers_sync;
lock_type _raft_servers_lock;
Expand All @@ -71,13 +67,12 @@ class msg_service : public std::enable_shared_from_this< msg_service > {
data_service_t _data_service;
bool _data_service_enabled;

msg_service(get_server_ctx_cb get_server_ctx, process_offload_cb process_offload,
std::string const& service_address, bool const enable_data_service);
msg_service(get_server_ctx_cb get_server_ctx, std::string const& service_address, bool const enable_data_service);
~msg_service();

public:
static std::shared_ptr< msg_service > create(get_server_ctx_cb get_server_ctx, process_offload_cb poc,
std::string const& service_address, bool const enable_data_service);
static std::shared_ptr< msg_service > create(get_server_ctx_cb get_server_ctx, std::string const& service_address,
bool const enable_data_service);

msg_service(msg_service const&) = delete;
msg_service& operator=(msg_service const&) = delete;
Expand Down
Loading

0 comments on commit 62b38e3

Please sign in to comment.