From 09ae1675b0fc7b7679408dd5d19ad0565a4a7153 Mon Sep 17 00:00:00 2001 From: yujingwei Date: Mon, 9 Oct 2023 08:55:46 +0000 Subject: [PATCH] java client support FQDN --- scripts/recompile_thrift.sh | 2 +- src/common/consensus.thrift | 4 +++ src/failure_detector/fd.thrift | 27 ++++++++++------- src/nfs/nfs.thrift | 2 ++ src/runtime/rpc/group_host_port.h | 6 ++-- src/runtime/rpc/rpc_host_port.cpp | 40 ++++++++++++++++++++++++- src/runtime/rpc/rpc_host_port.h | 49 +++++++++++++++++++++++++++++++ 7 files changed, 115 insertions(+), 15 deletions(-) diff --git a/scripts/recompile_thrift.sh b/scripts/recompile_thrift.sh index 444a503483..52e5397be2 100755 --- a/scripts/recompile_thrift.sh +++ b/scripts/recompile_thrift.sh @@ -30,7 +30,7 @@ rm -rf $TMP_DIR mkdir -p $TMP_DIR $THIRDPARTY_ROOT/output/bin/thrift --gen cpp:moveable_types -out $TMP_DIR ../idl/rrdb.thrift -sed 's/#include "dsn_types.h"/#include "runtime\/rpc\/rpc_address.h"\n#include "runtime\/task\/task_code.h"\n#include "utils\/blob.h"/' $TMP_DIR/rrdb_types.h > ../src/include/rrdb/rrdb_types.h +sed 's/#include "dsn_types.h"/#include "runtime\/rpc\/rpc_address.h"\n#include "runtime\/rpc\/rpc_host_port.h"\n#include "runtime\/task\/task_code.h"\n#include "utils\/blob.h"/' $TMP_DIR/rrdb_types.h > ../src/include/rrdb/rrdb_types.h sed 's/#include "rrdb_types.h"/#include /' $TMP_DIR/rrdb_types.cpp > ../src/base/rrdb_types.cpp rm -rf $TMP_DIR diff --git a/src/common/consensus.thrift b/src/common/consensus.thrift index ec862b3364..48cd9b0445 100644 --- a/src/common/consensus.thrift +++ b/src/common/consensus.thrift @@ -143,6 +143,7 @@ struct learn_request // be duplicated (ie. max_gced_decree < confirmed_decree), if not, // learnee will copy the missing logs. 7:optional i64 max_gced_decree; + 8:optional dsn.host_port hp_learner; } struct learn_response @@ -156,6 +157,7 @@ struct learn_response 7:dsn.rpc_address address; // learnee's address 8:string base_local_dir; // base dir of files on learnee 9:optional string replica_disk_tag; // the disk tag of learnee located + 10:optional dsn.host_port hp_address; // learnee's address } struct learn_notify_response @@ -180,6 +182,7 @@ struct group_check_request // Used to deliver child gpid and meta_split_status during partition split 6:optional dsn.gpid child_gpid; 7:optional metadata.split_status meta_split_status; + 8:optional dsn.host_port hp_node; } struct group_check_response @@ -195,5 +198,6 @@ struct group_check_response // if secondary pause or cancel split succeed, is_split_stopped = true 8:optional bool is_split_stopped; 9:optional metadata.disk_status disk_status = metadata.disk_status.NORMAL; + 10:optional dsn.host_port hp_node; } diff --git a/src/failure_detector/fd.thrift b/src/failure_detector/fd.thrift index a85a38a67b..12054106ea 100644 --- a/src/failure_detector/fd.thrift +++ b/src/failure_detector/fd.thrift @@ -30,23 +30,28 @@ namespace cpp dsn.fd struct beacon_msg { - 1: i64 time; - 2: dsn.rpc_address from_addr; - 3: dsn.rpc_address to_addr; - 4: optional i64 start_time; + 1: i64 time; + 2: dsn.rpc_address from_addr; + 3: dsn.rpc_address to_addr; + 4: optional i64 start_time; + 5: optional dsn.host_port hp_from_addr; + 6: optional dsn.host_port hp_to_addr; } struct beacon_ack { - 1: i64 time; - 2: dsn.rpc_address this_node; - 3: dsn.rpc_address primary_node; - 4: bool is_master; - 5: bool allowed; + 1: i64 time; + 2: dsn.rpc_address this_node; + 3: dsn.rpc_address primary_node; + 4: bool is_master; + 5: bool allowed; + 6: optional dsn.host_port hp_this_node; + 7: optional dsn.host_port hp_primary_node; } struct config_master_message { - 1: dsn.rpc_address master; - 2: bool is_register; + 1: dsn.rpc_address master; + 2: bool is_register; + 3: optional dsn.host_port hp_master; } diff --git a/src/nfs/nfs.thrift b/src/nfs/nfs.thrift index 3f0f96bbb9..706e8c6b1e 100644 --- a/src/nfs/nfs.thrift +++ b/src/nfs/nfs.thrift @@ -40,6 +40,7 @@ struct copy_request 8: bool overwrite; 9: optional string source_disk_tag; 10: optional dsn.gpid pid; + 11: optional dsn.host_port hp_source; } struct copy_response @@ -60,6 +61,7 @@ struct get_file_size_request 6: optional string source_disk_tag; 7: optional string dest_disk_tag; 8: optional dsn.gpid pid; + 9: optional dsn.host_port hp_source; } struct get_file_size_response diff --git a/src/runtime/rpc/group_host_port.h b/src/runtime/rpc/group_host_port.h index cf7cab19ba..c6703493a9 100644 --- a/src/runtime/rpc/group_host_port.h +++ b/src/runtime/rpc/group_host_port.h @@ -23,7 +23,6 @@ #include #include "runtime/rpc/group_address.h" -#include "runtime/rpc/group_host_port.h" #include "runtime/rpc/rpc_host_port.h" #include "utils/autoref_ptr.h" #include "utils/fmt_logging.h" @@ -131,7 +130,10 @@ inline rpc_group_host_port::rpc_group_host_port(const rpc_group_address *g_addr) CHECK_TRUE(add(host_port(addr))); } _update_leader_automatically = g_addr->is_update_leader_automatically(); - set_leader(host_port(g_addr->leader())); + auto leader_addr = g_addr->leader(); + if (rpc_address::s_invalid_address != leader_addr) { + set_leader(host_port(leader_addr)); + } } inline rpc_group_host_port &rpc_group_host_port::operator=(const rpc_group_host_port &other) diff --git a/src/runtime/rpc/rpc_host_port.cpp b/src/runtime/rpc/rpc_host_port.cpp index 13c6462b7b..7a37e2b777 100644 --- a/src/runtime/rpc/rpc_host_port.cpp +++ b/src/runtime/rpc/rpc_host_port.cpp @@ -31,6 +31,7 @@ #include "runtime/rpc/rpc_host_port.h" #include "utils/error_code.h" #include "utils/safe_strerror_posix.h" +#include "utils/string_conv.h" #include "utils/utils.h" namespace dsn { @@ -68,8 +69,35 @@ host_port::host_port(std::string host, uint16_t port) CHECK_NE_MSG(rpc_address::ipv4_from_host(_host.c_str()), 0, "invalid hostname: {}", _host); } +bool host_port::from_string(const std::string s) +{ + std::string ip_port = s; + auto pos = ip_port.find_last_of(':'); + if (pos == std::string::npos) { + return false; + } + std::string host = ip_port.substr(0, pos); + std::string port = ip_port.substr(pos + 1); + + // check port + unsigned int port_num; + if (!internal::buf2unsigned(port, port_num) || port_num > UINT16_MAX) { + return false; + } + + if (rpc_address::ipv4_from_host(host.c_str()) == 0) { + return false; + } + + _type = HOST_TYPE_IPV4; + _host = host; + _port = port_num; + return true; +} + host_port::host_port(rpc_address addr) { + reset(); switch (addr.type()) { case HOST_TYPE_IPV4: { CHECK(utils::hostname_from_ip(htonl(addr.ip()), &_host), @@ -79,6 +107,7 @@ host_port::host_port(rpc_address addr) } break; case HOST_TYPE_GROUP: { _group_host_port = new rpc_group_host_port(addr.group_address()); + _group_host_port->add_ref(); } break; default: break; @@ -133,7 +162,7 @@ std::string host_port::to_string() const case HOST_TYPE_GROUP: return fmt::format("address group {}", group_host_port()->name()); default: - return "invalid address"; + return "invalid host_port"; } } @@ -198,4 +227,13 @@ error_s host_port::resolve_addresses(std::vector &addresses) const return error_s::ok(); } +void host_port::fill_host_ports_from_addresses(const std::vector &addr_v, + std::vector &hp_v) +{ + CHECK(hp_v.empty(), "optional host_port should be empty!"); + for (const auto &addr : addr_v) { + hp_v.emplace_back(host_port(addr)); + } +} + } // namespace dsn diff --git a/src/runtime/rpc/rpc_host_port.h b/src/runtime/rpc/rpc_host_port.h index be825b22c4..bdd7129c53 100644 --- a/src/runtime/rpc/rpc_host_port.h +++ b/src/runtime/rpc/rpc_host_port.h @@ -41,6 +41,31 @@ class TProtocol; } // namespace thrift } // namespace apache +#define FILL_HP_OPTIONAL_SECTION(OBJ_NAME, SECTION_NAME) \ + do { \ + host_port hp(OBJ_NAME.SECTION_NAME); \ + OBJ_NAME.__set_hp_##SECTION_NAME(hp); \ + } while (false) + +#define FILL_HP_LIST_OPTIONAL_SECTION(OBJ_NAME, SECTION_NAME) \ + do { \ + std::vector hps; \ + dsn::host_port::fill_host_ports_from_addresses(OBJ_NAME.SECTION_NAME, hps); \ + OBJ_NAME.__set_hp_##SECTION_NAME(hps); \ + } while (false) + +#define FILL_OPTIONAL_HP_IF_NEEDED(OBJ_NAME, SECTION_NAME) \ + do { \ + if (!OBJ_NAME.__isset.hp_##SECTION_NAME) \ + FILL_HP_OPTIONAL_SECTION(OBJ_NAME, SECTION_NAME); \ + } while (false) + +#define FILL_OPTIONAL_HP_LIST_IF_NEEDED(OBJ_NAME, SECTION_NAME) \ + do { \ + if (!OBJ_NAME.__isset.hp_##SECTION_NAME) \ + FILL_HP_LIST_OPTIONAL_SECTION(OBJ_NAME, SECTION_NAME); \ + } while (false) + namespace dsn { class rpc_group_host_port; @@ -83,10 +108,17 @@ class host_port // Trere may be multiple rpc_addresses for one host_port. error_s resolve_addresses(std::vector &addresses) const; + // This function is used for validating the format of string like "test_host:1234". + bool from_string(const std::string s); + // for serialization in thrift format uint32_t read(::apache::thrift::protocol::TProtocol *iprot); uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + static void fill_host_ports_from_addresses(const std::vector &addr_v, + /*output*/ std::vector &hp_v); + + private: std::string _host; uint16_t _port = 0; @@ -116,6 +148,23 @@ inline bool operator==(const host_port &hp1, const host_port &hp2) inline bool operator!=(const host_port &hp1, const host_port &hp2) { return !(hp1 == hp2); } +inline bool operator<(const host_port &hp1, const host_port &hp2) +{ + if (hp1.type() != hp2.type()) { + return hp1.type() < hp2.type(); + } + + switch (hp1.type()) { + case HOST_TYPE_IPV4: + return hp1.host() < hp2.host() || (hp1.host() == hp2.host() && hp1.port() < hp2.port()); + case HOST_TYPE_GROUP: + return reinterpret_cast(hp1.group_host_port()) < + reinterpret_cast(hp2.group_host_port()); + default: + return true; + } +} + } // namespace dsn USER_DEFINED_STRUCTURE_FORMATTER(::dsn::host_port);