Skip to content

Commit

Permalink
ISSUE-65 new cluster interface
Browse files Browse the repository at this point in the history
  • Loading branch information
huyumars committed Mar 30, 2022
1 parent 8d15fe0 commit 3792c27
Show file tree
Hide file tree
Showing 21 changed files with 444 additions and 411 deletions.
2 changes: 1 addition & 1 deletion src/app_demo/should_be_generated/app/RequestReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ RequestReceiver::RequestReceiver(
mCommandQueue(commandQueue) {
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
auto node = gringofts::app::AppInfo::getMyNode();
mIpPort = node.mHostName + ":" + std::to_string(node.mPortForGateway);
mIpPort = node->hostName()+ ":" + std::to_string(node->gateWayPort());
assert(mIpPort != "UNKNOWN");

mTlsConfOpt = TlsUtil::parseTlsConf(reader, "tls");
Expand Down
162 changes: 160 additions & 2 deletions src/app_util/AppInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,170 @@ See the License for the specific language governing permissions and
limitations under the License.
**************************************************************************/

#include <absl/strings/str_split.h>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <spdlog/spdlog.h>

#include "AppInfo.h"

namespace gringofts::app {

void AppInfo::init(const INIReader &reader) {
bool AppClusterParser::checkHasRoute(const std::string &routeStr, uint64_t clusterId, uint64_t epoch) {
using boost::property_tree::ptree;
using boost::property_tree::read_json;
using boost::property_tree::write_json;
using boost::property_tree::json_parser_error;
std::stringstream ss(routeStr);
ptree globalRoute;
try {
read_json(ss, globalRoute);
auto routeEpoch = std::stoi(globalRoute.get_child("epoch").data());
if (routeEpoch < epoch) {
SPDLOG_WARN("global epoch {} is less than local epoch {}", routeEpoch, epoch);
}
auto infos = globalRoute.get_child("routeInfos");
for (auto &[k, v] : infos) {
auto clusterNode = v.get_child("clusterId");
auto id = std::stoi(clusterNode.data());
if (clusterId == id) {
std::stringstream sout;
write_json(sout, v);
SPDLOG_INFO("find route for cluster {} : {}", clusterId, sout.str());
return true;
}
}
return false;
} catch (const json_parser_error &err) {
SPDLOG_ERROR("error when parse json {} for {}", routeStr, err.message());
return false;
} catch (const std::exception &err) {
SPDLOG_ERROR("error when parse json {} for {}", routeStr, err.what());
return false;
}
}

std::tuple<NodeId, ClusterId, ClusterParser::ClusterMap> AppClusterParser::parse(const INIReader &iniReader) {
std::string storeType = iniReader.Get("cluster", "persistence.type", "UNKNOWN");
assert(storeType == "raft");
bool externalEnabled = iniReader.GetBoolean("raft.external", "enable", false);
if (!externalEnabled) {
/// load from local config, the cluster id and node id must be specified
auto clusterConf = iniReader.Get("cluster", "cluster.conf", "");
auto allClusterInfo = parseToClusterInfo(clusterConf);
auto myClusterId = iniReader.GetInteger("cluster", "self.clusterId", 0);
auto myNodeId = iniReader.GetInteger("cluster", "self.nodeId", 0);
bool hasMe = false;
for (auto &[clusterId, info] : allClusterInfo) {
if (myClusterId == clusterId) {
for (auto &[nodeId, node] : info.getAllNodes()) {
if (nodeId == myNodeId) {
hasMe = true;
break;
}
}
}
}
assert(hasMe);

Signal::hub.handle<RouteSignal>([](const Signal &s) {
const auto &signal = dynamic_cast<const RouteSignal &>(s);
SPDLOG_WARN("for non-external controlled cluster direct start raft");
signal.passValue(true);
});

SPDLOG_INFO("read raft cluster conf from local, "
"cluster.conf={}, self.clusterId={}, self.nodeId={}",
clusterConf, myClusterId, myNodeId);
return {myClusterId, myNodeId, allClusterInfo};
} else {
// if enable external kv store for cluster info, it must have kv factory
assert(mKvFactory);
std::string externalConfigFile = iniReader.Get("raft.external", "config.file", "");
std::string clusterConfKey = iniReader.Get("raft.external", "cluster.conf.key", "");
std::string clusterRouteKey = iniReader.Get("raft.external", "cluster.route.key", "");
assert(!externalConfigFile.empty());
assert(!clusterConfKey.empty());

/// init external client
auto client = mKvFactory->produce(INIReader(externalConfigFile));
/// read raft cluster conf from external
std::string clusterConf;
auto r = client->getValue(clusterConfKey, &clusterConf);
assert(!clusterConfKey.empty());
auto allClusterInfo = parseToClusterInfo(clusterConf);
/// N.B.: when using external, the assumption is two nodes will never run on the same host,
/// otherwise below logic will break.
auto myHostname = Util::getHostname();
std::optional<ClusterId> myClusterId = std::nullopt;
std::optional<NodeId> myNodeId = std::nullopt;
for (auto &[clusterId, info] : allClusterInfo) {
for (auto &[nodeId, node] : info.getAllNodes()) {
if (myHostname == node->hostName()) {
myClusterId = clusterId;
myNodeId = nodeId;
break;
}
}
}
assert(myClusterId);
assert(myNodeId);

SPDLOG_INFO("raft cluster conf passed from external, "
"cluster.conf={}, hostname={}", clusterConf, myHostname);
auto clusterId = *myClusterId;

Signal::hub.handle<RouteSignal>([client, clusterRouteKey, clusterId](const Signal &s) {
const auto &signal = dynamic_cast<const RouteSignal &>(s);
std::string val;
SPDLOG_INFO("receive signal for query route, cluster {}, epoch {}", clusterId, signal.mEpoch);
assert(clusterId == signal.mClusterId);
assert(!clusterRouteKey.empty());
client->getValue(clusterRouteKey, &val);
signal.passValue(checkHasRoute(val, clusterId, signal.mEpoch));
});

return {*myClusterId, *myNodeId, allClusterInfo};
}
}

std::unordered_map<ClusterId, Cluster> AppClusterParser::parseToClusterInfo(const std::string &infoStr) const {
std::unordered_map<ClusterId, Cluster> result;
std::vector<std::string> clusters = absl::StrSplit(infoStr, ";");
for (auto &c : clusters) {
Cluster info;
std::pair<std::string, std::string> clusterIdWithNodes = absl::StrSplit(c, "#");
info.setClusterId(std::stoi(clusterIdWithNodes.first));
std::vector<std::string> nodes = absl::StrSplit(clusterIdWithNodes.second, ",");
for (auto &n : nodes) {
std::pair<std::string, std::string> hostWithPort = absl::StrSplit(n, ":");
std::pair<std::string, std::string> idWithHost = absl::StrSplit(hostWithPort.first, "@");
auto nodeId = std::stoi(idWithHost.first);
auto hostname = idWithHost.second;
std::shared_ptr<Node> node;
if (hostWithPort.second.empty()) {
SPDLOG_INFO("{} no specific port, using default one", hostWithPort.second);
node = std::make_shared<AppNode>(nodeId, hostname);
} else {
std::vector<std::string> ports = absl::StrSplit(hostWithPort.second, "|");
assert(ports.size() == 6);
auto portForRaft = std::stoi(ports[0]);
auto portForGateway = std::stoi(ports[1]);
auto portForDumper = std::stoi(ports[2]);
auto portForStream = std::stoi(ports[3]);
auto portForNetAdmin = std::stoi(ports[4]);
auto portForScale = std::stoi(ports[5]);
node = std::make_shared<AppNode>(nodeId, hostname, portForRaft, portForStream,
portForGateway, portForDumper, portForNetAdmin, portForScale);
}
info.addNode(node);
}
result[info.id()] = info;
}
return result;
}

void AppInfo::init(const INIReader &reader, std::unique_ptr<ClusterParser> parser) {
auto &appInfo = getInstance();

auto &initialized = appInfo.initialized;
Expand All @@ -28,7 +185,7 @@ void AppInfo::init(const INIReader &reader) {
return;
}

auto[myClusterId, myNodeId, allClusterInfo] = ClusterInfo::resolveAllClusters(reader, nullptr);
auto[myClusterId, myNodeId, allClusterInfo] = parser->parse(reader);
appInfo.mMyClusterId = myClusterId;
appInfo.mMyNodeId = myNodeId;
appInfo.mAllClusterInfo = allClusterInfo;
Expand All @@ -54,4 +211,5 @@ void AppInfo::init(const INIReader &reader) {
appInfo.mMyClusterId,
appInfo.mMyNodeId);
}

} /// namespace gringofts::app
72 changes: 59 additions & 13 deletions src/app_util/AppInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,59 @@ limitations under the License.

#include <INIReader.h>


#include "../infra/common_types.h"
#include "../infra/util/ClusterInfo.h"
#include "../infra/util/Cluster.h"
#include "../infra/util/Signal.h"
#include "../infra/util/KVClient.h"

namespace gringofts {
namespace app {

struct RouteSignal : public FutureSignal<bool> {
RouteSignal(uint64_t epoch, uint64_t clusterId) : mEpoch(epoch), mClusterId(clusterId) {}
uint64_t mEpoch;
uint64_t mClusterId;
};

class AppNode : public RaftNode {
static constexpr Port kDefaultGatewayPort = 50055;
static constexpr Port kDefaultFetchPort = 50056;
static constexpr Port kDefaultNetAdminPort = 50065;
static constexpr Port kDefaultScalePort = 61203;
public:
AppNode(NodeId id, const HostName &hostName) : RaftNode(id, hostName) {}
AppNode(NodeId id, const HostName &hostName,
Port raftPort, Port streamPort,
Port gateWayPort, Port dumperPort, Port netAdminPort, Port scalePort)
: RaftNode(id, hostName, raftPort, streamPort),
mPortForGateway(gateWayPort), mPortForFetch(dumperPort),
mPortForNetAdmin(netAdminPort), mPortForScale(scalePort) {
}
inline Port gateWayPort() const { return mPortForGateway; }
inline Port fetchPort() const { return mPortForFetch; }
inline Port netAdminPort() const { return mPortForNetAdmin; }
inline Port scalePort() const { return mPortForScale; }
private:
Port mPortForGateway = kDefaultGatewayPort;
Port mPortForFetch = kDefaultFetchPort;
Port mPortForNetAdmin = kDefaultNetAdminPort;
Port mPortForScale = kDefaultScalePort;
};

class AppClusterParser : public ClusterParser {
public:
AppClusterParser() : mKvFactory(nullptr) {}
explicit AppClusterParser(std::unique_ptr<kv::ClientFactory> factory) : mKvFactory(std::move(factory)) {}
std::tuple<NodeId, ClusterId, ClusterMap> parse(const INIReader &) override;

static bool checkHasRoute(const std::string &routeStr, uint64_t clusterId, uint64_t epoch);

private:
std::unordered_map<ClusterId, Cluster> parseToClusterInfo(const std::string &infoStr) const;

std::unique_ptr<kv::ClientFactory> mKvFactory;
};

class AppInfo final {
public:
~AppInfo() = default;
Expand All @@ -32,7 +78,8 @@ class AppInfo final {
getInstance().initialized = false;
}

static void init(const INIReader &reader);
static void init(const INIReader &reader,
std::unique_ptr<ClusterParser> parser = std::make_unique<AppClusterParser>());

/// disallow copy ctor and copy assignment
AppInfo(const AppInfo &) = delete;
Expand All @@ -44,16 +91,16 @@ class AppInfo final {
static bool stressTestEnabled() { return getInstance().mStressTestEnabled; }
static std::string appVersion() { return getInstance().mAppVersion; }

static ClusterInfo getMyClusterInfo() {
static Cluster getMyClusterInfo() {
return getInstance().mAllClusterInfo[getInstance().mMyClusterId];
}

static ClusterInfo::Node getMyNode() {
static std::shared_ptr<AppNode> getMyNode() {
assert(getInstance().initialized);
return getMyClusterInfo().getAllNodeInfo()[getMyNodeId()];
return std::dynamic_pointer_cast<AppNode>(getMyClusterInfo().getAllNodes()[getMyNodeId()]);
}

static std::optional<ClusterInfo> getClusterInfo(uint64_t clusterId) {
static std::optional<Cluster> getClusterInfo(uint64_t clusterId) {
if (getInstance().mAllClusterInfo.count(clusterId)) {
return getInstance().mAllClusterInfo[clusterId];
} else {
Expand All @@ -65,20 +112,19 @@ class AppInfo final {
static ClusterId getMyClusterId() { return getInstance().mMyClusterId; }

static Port netAdminPort() {
auto node = getMyClusterInfo().getAllNodeInfo()[getMyNodeId()];
return node.mPortForNetAdmin;
return getMyNode()->netAdminPort();
}

static Port scalePort() {
return getMyNode().mPortForScale;
return getMyNode()->scalePort();
}

static Port gatewayPort() {
return getMyNode().mPortForGateway;
return getMyNode()->gateWayPort();
}

static Port fetchPort() {
return getMyNode().mPortForFetcher;
return getMyNode()->fetchPort();
}

private:
Expand Down Expand Up @@ -123,7 +169,7 @@ class AppInfo final {
/**
* Cluster Info
*/
std::map<ClusterId, ClusterInfo> mAllClusterInfo;
std::unordered_map<ClusterId, Cluster> mAllClusterInfo;
ClusterId mMyClusterId;
NodeId mMyNodeId;
};
Expand Down
8 changes: 4 additions & 4 deletions src/app_util/NetAdminServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ using gringofts::raft::RaftRole;
class NetAdminServer final : public AppNetAdmin::Service {
public:
NetAdminServer(const INIReader &reader,
std::shared_ptr<NetAdminServiceProvider> netAdminProxy, uint64_t port = kDefaultNetAdminPort) :
std::shared_ptr<NetAdminServiceProvider> netAdminProxy, uint64_t port = AppInfo::netAdminPort()) :
mServiceProvider(netAdminProxy),
mSnapshotTakenCounter(getCounter("snapshot_taken_counter", {})),
mSnapshotFailedCounter(getCounter("snapshot_failed_counter", {})),
Expand Down Expand Up @@ -240,9 +240,9 @@ class NetAdminServer final : public AppNetAdmin::Service {
}

std::vector<std::string> targets;
for (const auto &nodeKV : fromClusterOpt->getAllNodeInfo()) {
auto &node = nodeKV.second;
auto targetHost = node.mHostName + ":" + std::to_string(node.mPortForStream);
for (const auto &nodeKV : fromClusterOpt->getAllNodes()) {
auto node = std::dynamic_pointer_cast<AppNode>(nodeKV.second);
auto targetHost = node->hostName() + ":" + std::to_string(node->streamPort());
SPDLOG_INFO("set up sync target {}", targetHost);
targets.push_back(std::move(targetHost));
}
Expand Down
3 changes: 1 addition & 2 deletions src/app_util/control/CtrlState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
#include <spdlog/spdlog.h>

#include "../AppInfo.h"
#include "../../infra/util/ClusterInfo.h"
#include "../../infra/util/Signal.h"
#include "../../infra/raft/RaftSignal.h"

Expand Down Expand Up @@ -82,7 +81,7 @@ void CtrlState::recoverForEAL(std::string_view str) {
// for cluster Id > 0, need to start raft
// for cluster = 0, direct start raft
if (hasState() && mClusterId > 0) {
auto routeSignal = std::make_shared<gringofts::RouteSignal>(mEpoch, mClusterId);
auto routeSignal = std::make_shared<gringofts::app::RouteSignal>(mEpoch, mClusterId);
// query route info to guarantee it can start raft
Signal::hub << routeSignal;
if (routeSignal->getFuture().get()) {
Expand Down
2 changes: 1 addition & 1 deletion src/infra/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ set(GRINGOFTS_MONITOR_SRC

set(GRINGOFTS_UTIL_SRC
util/BigDecimal.cpp
util/ClusterInfo.cpp
util/Cluster.cpp
util/CryptoUtil.cpp
util/FileUtil.cpp
util/TrackingMemoryResource.cpp
Expand Down
4 changes: 2 additions & 2 deletions src/infra/raft/RaftBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace raft {
inline std::shared_ptr<RaftInterface> buildRaftImpl(
const char *configPath,
const NodeId &nodeId,
const ClusterInfo &clusterInfo,
const Cluster &cluster,
std::shared_ptr<DNSResolver> dnsResolver = nullptr,
RaftRole role = RaftRole::Follower ) {
INIReader iniReader(configPath);
Expand All @@ -46,7 +46,7 @@ inline std::shared_ptr<RaftInterface> buildRaftImpl(
/// use default dns resolver
dnsResolver = std::make_shared<DNSResolver>();
}
return std::make_shared<v2::RaftCore>(configPath, nodeId, clusterInfo, dnsResolver, role);
return std::make_shared<v2::RaftCore>(configPath, nodeId, cluster, dnsResolver, role);
} else {
SPDLOG_ERROR("Unknown raft implement version {}.", version);
exit(1);
Expand Down
Loading

0 comments on commit 3792c27

Please sign in to comment.