Skip to content

Commit

Permalink
Fix test_package.
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd committed Oct 4, 2023
1 parent a889037 commit 4fb1fe5
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 83 deletions.
5 changes: 3 additions & 2 deletions src/include/nuraft_mesg/grpc_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// inherited rpc_client instances sharing a common worker pool.
#pragma once

#include <libnuraft/async.hxx>
#include <memory>

#include <folly/SharedMutex.h>
Expand Down Expand Up @@ -45,9 +46,9 @@ class grpc_factory : public nuraft::rpc_client_factory, public std::enable_share
nuraft::ptr< nuraft::rpc_client > create_client(const std::string& client) override;
nuraft::ptr< nuraft::rpc_client > create_client(peer_id_t const& client);

virtual std::error_condition create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0;
virtual nuraft::cmd_result_code create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0;

virtual std::error_condition reinit_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0;
virtual nuraft::cmd_result_code reinit_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) = 0;

// Construct and send an AddServer message to the cluster
NullAsyncResult add_server(uint32_t const srv_id, peer_id_t const& srv_addr, nuraft::srv_config const& dest_cfg);
Expand Down
14 changes: 7 additions & 7 deletions src/include/nuraft_mesg/mesg_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*********************************************************************************/
#pragma once

#include <future>
#include <libnuraft/async.hxx>
#include <memory>
#include <mutex>
#include <string>
Expand All @@ -41,9 +41,9 @@ class group_factory : public grpc_factory {
std::shared_ptr< sisl::GrpcTokenClient > const token_client, std::string const& ssl_cert = "");

using grpc_factory::create_client;
std::error_condition create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) override;
std::error_condition reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) override;
nuraft::cmd_result_code create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >&) override;
nuraft::cmd_result_code reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) override;

virtual std::string lookupEndpoint(peer_id_t const& client) = 0;
};
Expand All @@ -65,10 +65,10 @@ class mesg_factory final : public grpc_factory {

group_id_t group_name() const { return _group_name; }

std::error_condition create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >& rpc_ptr) override;
nuraft::cmd_result_code create_client(peer_id_t const& client, nuraft::ptr< nuraft::rpc_client >& rpc_ptr) override;

std::error_condition reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) override;
nuraft::cmd_result_code reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) override;

AsyncResult< sisl::io_blob > data_service_request(std::string const& request_name, io_blob_list_t const& cli_buf);
};
Expand Down
1 change: 0 additions & 1 deletion src/include/nuraft_mesg/nuraft_mesg.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <list>
#include <memory>
#include <string>
#include <system_error>
#include <vector>

#include <libnuraft/nuraft.hxx>
Expand Down
8 changes: 4 additions & 4 deletions src/lib/grpc_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,16 @@ nuraft::ptr< nuraft::rpc_client > grpc_factory::create_client(peer_id_t const& c
if (_clients.end() != it) {
if (!happened) {
LOGDEBUGMOD(nuraft_mesg, "Re-creating client for {}", client);
if (auto err = reinit_client(client, it->second); err) {
LOGERROR("Failed to re-initialize client {}: {}", client, err.message());
if (auto err = reinit_client(client, it->second); nuraft::OK != err) {
LOGERROR("Failed to re-initialize client {}: {}", client, err);
new_client = std::make_shared< grpc_error_client >();
} else {
new_client = it->second;
}
} else {
LOGDEBUGMOD(nuraft_mesg, "Creating client for {}", client);
if (auto err = create_client(client, it->second); err) {
LOGERROR("Failed to create client for {}: {}", client, err.message());
if (auto err = create_client(client, it->second); nuraft::OK != err) {
LOGERROR("Failed to create client for {}: {}", client, err);
new_client = std::make_shared< grpc_error_client >();
} else {
new_client = it->second;
Expand Down
32 changes: 0 additions & 32 deletions src/lib/manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <sisl/options/options.h>
#include <sisl/grpc/rpc_server.hpp>
#include <sisl/grpc/generic_service.hpp>
#include <system_error>

#include "service.hpp"
#include "nuraft_mesg/mesg_factory.hpp"
Expand All @@ -28,37 +27,6 @@ constexpr auto grpc_server_threads = 1u;

namespace nuraft_mesg {

static std::error_condition convertToError(nuraft::cmd_result_code rc) {
switch (rc) {
case nuraft::OK:
return std::error_condition();
case nuraft::CANCELLED:
return std::make_error_condition(std::errc::operation_canceled);
case nuraft::TIMEOUT:
return std::make_error_condition(std::errc::timed_out);
case nuraft::NOT_LEADER:
return std::make_error_condition(std::errc::permission_denied);
case nuraft::BAD_REQUEST:
return std::make_error_condition(std::errc::invalid_argument);
case nuraft::SERVER_ALREADY_EXISTS:
return std::make_error_condition(std::errc::file_exists);
case nuraft::CONFIG_CHANGING:
return std::make_error_condition(std::errc::interrupted);
case nuraft::SERVER_IS_JOINING:
return std::make_error_condition(std::errc::device_or_resource_busy);
case nuraft::SERVER_NOT_FOUND:
return std::make_error_condition(std::errc::no_such_device);
case nuraft::CANNOT_REMOVE_LEADER:
return std::make_error_condition(std::errc::not_supported);
case nuraft::SERVER_IS_LEAVING:
return std::make_error_condition(std::errc::owner_dead);
case nuraft::FAILED:
[[fallthrough]];
default:
return std::make_error_condition(std::errc::io_error);
}
}

int32_t to_server_id(peer_id_t const& server_addr) {
boost::hash< boost::uuids::uuid > uuid_hasher;
return uuid_hasher(server_addr) >> 33;
Expand Down
2 changes: 0 additions & 2 deletions src/lib/manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
#pragma once

#include <condition_variable>
#include <future>
#include <libnuraft/async.hxx>
#include <list>
#include <map>
#include <mutex>
#include <string>
#include <system_error>

#include <sisl/logging/logging.h>

Expand Down
28 changes: 14 additions & 14 deletions src/lib/mesg_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,24 +134,24 @@ class group_client : public grpc_base_client {
}
};

std::error_condition mesg_factory::create_client(peer_id_t const& client,
nuraft::ptr< nuraft::rpc_client >& raft_client) {
nuraft::cmd_result_code mesg_factory::create_client(peer_id_t const& client,
nuraft::ptr< nuraft::rpc_client >& raft_client) {
// Re-direct this call to a global factory so we can re-use clients to the same endpoints
LOGDEBUGMOD(nuraft_mesg, "Creating client to {}", client);
auto m_client = std::dynamic_pointer_cast< messaging_client >(_group_factory->create_client(to_string(client)));
if (!m_client) { return std::make_error_condition(std::errc::connection_aborted); }
if (!m_client) return nuraft::CANCELLED;
raft_client = std::make_shared< group_client >(m_client, client, _group_name, _group_type, _metrics);
return (!raft_client) ? std::make_error_condition(std::errc::invalid_argument) : std::error_condition();
return (!raft_client) ? nuraft::BAD_REQUEST : nuraft::OK;
}

std::error_condition mesg_factory::reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) {
nuraft::cmd_result_code mesg_factory::reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) {
LOGDEBUGMOD(nuraft_mesg, "Re-init client to {}", client);
auto g_client = std::dynamic_pointer_cast< group_client >(raft_client);
auto new_raft_client = std::static_pointer_cast< nuraft::rpc_client >(g_client->realClient());
if (auto err = _group_factory->reinit_client(client, new_raft_client); err) { return err; }
g_client->setClient(std::dynamic_pointer_cast< messaging_client >(new_raft_client));
return std::error_condition();
return nuraft::OK;
}

AsyncResult< sisl::io_blob > mesg_factory::data_service_request(std::string const& request_name,
Expand All @@ -171,27 +171,27 @@ group_factory::group_factory(int const cli_thread_count, group_id_t const& name,
m_ssl_cert = ssl_cert;
}

std::error_condition group_factory::create_client(peer_id_t const& client,
nuraft::ptr< nuraft::rpc_client >& raft_client) {
nuraft::cmd_result_code group_factory::create_client(peer_id_t const& client,
nuraft::ptr< nuraft::rpc_client >& raft_client) {
LOGDEBUGMOD(nuraft_mesg, "Creating client to {}", client);
auto endpoint = lookupEndpoint(client);
if (endpoint.empty()) { return std::make_error_condition(std::errc::invalid_argument); }
if (endpoint.empty()) nuraft::BAD_REQUEST;

LOGDEBUGMOD(nuraft_mesg, "Creating client for [{}] @ [{}]", client, endpoint);
raft_client =
sisl::GrpcAsyncClient::make< messaging_client >(workerName(), endpoint, m_token_client, "", m_ssl_cert);
return (!raft_client) ? std::make_error_condition(std::errc::connection_aborted) : std::error_condition();
return (!raft_client) ? nuraft::CANCELLED : nuraft::OK;
}

std::error_condition group_factory::reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) {
nuraft::cmd_result_code group_factory::reinit_client(peer_id_t const& client,
std::shared_ptr< nuraft::rpc_client >& raft_client) {
LOGDEBUGMOD(nuraft_mesg, "Re-init client to {}", client);
assert(raft_client);
auto mesg_client = std::dynamic_pointer_cast< messaging_client >(raft_client);
if (!mesg_client->is_connection_ready() || 0 < mesg_client->bad_service.load(std::memory_order_relaxed)) {
return create_client(client, raft_client);
}
return std::error_condition();
return nuraft::OK;
}

} // namespace nuraft_mesg
1 change: 1 addition & 0 deletions src/tests/test_state_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "test_state_manager.h"

#include <fstream>
#include <system_error>

#include "jungle_logstore/jungle_log_store.h"
#include <memory>
Expand Down
42 changes: 21 additions & 21 deletions test_package/example_client.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#include <folly/Expected.h>
#include <iostream>
#include <cassert>

#include <boost/uuid/string_generator.hpp>
#include <libnuraft/async.hxx>
#include <sisl/logging/logging.h>
#include <sisl/options/options.h>
#include <sisl/grpc/rpc_client.hpp>
Expand Down Expand Up @@ -49,14 +51,13 @@ int send_message(uint32_t leader_id, nuraft_mesg::group_id_t const& group_id, st
buf->put(message.c_str());
buf->pos(0);

nuraft::cmd_result_code rc = nuraft::SERVER_IS_JOINING;
while (nuraft::SERVER_IS_JOINING == rc || nuraft::CONFIG_CHANGING == rc) {
rc = factory->client_request(buf, dest_cfg).get();
if (nuraft::SERVER_IS_JOINING == rc || nuraft::CONFIG_CHANGING == rc) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
auto result = nuraft_mesg::NullResult(folly::makeUnexpected(nuraft::SERVER_IS_JOINING));
while (!result && (nuraft::SERVER_IS_JOINING == result.error() || nuraft::CONFIG_CHANGING == result.error())) {
auto sf = factory->client_request(buf, dest_cfg);
std::this_thread::sleep_for(std::chrono::milliseconds(200));
result = std::move(sf).get();
}
int ret = nuraft::OK == rc ? 0 : -1;
int ret = (!!result) ? 0 : -1;
sisl::GrpcAsyncClientWorker::shutdown_all();
return ret;
}
Expand All @@ -66,14 +67,14 @@ int add_new_server(uint32_t leader_id, uint32_t srv_id, nuraft_mesg::group_id_t
auto factory = std::make_shared< mesg_factory >(g_factory, group_id, "test_package");
auto const dest_cfg = srv_config(leader_id, uuids[leader_id]);

nuraft::cmd_result_code rc = nuraft::SERVER_IS_JOINING;
while (nuraft::SERVER_IS_JOINING == rc || nuraft::CONFIG_CHANGING == rc) {
rc = factory->add_server(srv_id, uuids[srv_id], dest_cfg).get();
if (nuraft::SERVER_IS_JOINING == rc || nuraft::CONFIG_CHANGING == rc) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
auto result = nuraft_mesg::NullResult(folly::makeUnexpected(nuraft::SERVER_IS_JOINING));
while (!result && (nuraft::SERVER_IS_JOINING == result.error() || nuraft::CONFIG_CHANGING == result.error())) {
auto srv_addr = boost::uuids::string_generator()(uuids[srv_id]);
auto sf = factory->add_server(srv_id, srv_addr, dest_cfg);
std::this_thread::sleep_for(std::chrono::milliseconds(200));
result = std::move(sf).get();
}
int ret = nuraft::OK == rc ? 0 : -1;
int ret = (!!result) ? 0 : -1;
sisl::GrpcAsyncClientWorker::shutdown_all();
return ret;
}
Expand All @@ -83,14 +84,13 @@ int remove_server(uint32_t leader_id, nuraft_mesg::group_id_t const& group_id, u
auto factory = std::make_shared< mesg_factory >(g_factory, group_id, "test_package");
auto const dest_cfg = srv_config(leader_id, uuids[leader_id]);

nuraft::cmd_result_code rc = nuraft::SERVER_IS_JOINING;
while (nuraft::SERVER_IS_JOINING == rc || nuraft::CONFIG_CHANGING == rc) {
rc = factory->rem_server(srv_id, dest_cfg).get();
if (nuraft::SERVER_IS_JOINING == rc || nuraft::CONFIG_CHANGING == rc) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
auto result = nuraft_mesg::NullResult(folly::makeUnexpected(nuraft::SERVER_IS_JOINING));
while (!result && (nuraft::SERVER_IS_JOINING == result.error() || nuraft::CONFIG_CHANGING == result.error())) {
auto sf = factory->rem_server(srv_id, dest_cfg);
std::this_thread::sleep_for(std::chrono::milliseconds(200));
result = std::move(sf).get();
}
int ret = nuraft::OK == rc ? 0 : -1;
int ret = (!!result) ? 0 : -1;
sisl::GrpcAsyncClientWorker::shutdown_all();
return ret;
}
Expand Down
1 change: 1 addition & 0 deletions test_package/example_state_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "example_state_machine.h"

#include <fstream>
#include <system_error>

#include <nlohmann/json.hpp>

Expand Down

0 comments on commit 4fb1fe5

Please sign in to comment.