From d8c6b25135f2ac974927e13f17c675dc1016493f Mon Sep 17 00:00:00 2001 From: yujingwei Date: Wed, 18 Oct 2023 16:53:34 +0800 Subject: [PATCH] feat: java client support FQDN --- .../org/apache/pegasus/base/host_port.java | 250 ++++++++++++++++++ .../org/apache/pegasus/base/rpc_address.java | 5 + .../apache/pegasus/rpc/async/MetaSession.java | 13 + .../pegasus/rpc/async/TableHandler.java | 3 +- 4 files changed, 270 insertions(+), 1 deletion(-) create mode 100644 java-client/src/main/java/org/apache/pegasus/base/host_port.java diff --git a/java-client/src/main/java/org/apache/pegasus/base/host_port.java b/java-client/src/main/java/org/apache/pegasus/base/host_port.java new file mode 100644 index 0000000000..775d466306 --- /dev/null +++ b/java-client/src/main/java/org/apache/pegasus/base/host_port.java @@ -0,0 +1,250 @@ +package org.apache.pegasus.base; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.*; + +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.apache.thrift.TFieldIdEnum; +import org.apache.thrift.meta_data.FieldMetaData; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TStruct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TODO + * + * @author yujingwei + * @version 1.0.0 + * @since 2023/10/09 16:30 + */ +public final class host_port + implements TBase, java.io.Serializable, Cloneable{ + private static final TStruct STRUCT_DESC = new TStruct("host_port"); + + private static final Logger LOG = LoggerFactory.getLogger(host_port.class); + + public String hostAddress; + + public short port; + + /** + * The set of fields this struct contains, along with convenience methods for finding and + * manipulating them. + */ + public enum _Fields implements TFieldIdEnum { + ; + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** Find the _Fields constant that matches fieldId, or null if its not found. */ + public static _Fields findByThriftId(int fieldId) { + switch (fieldId) { + default: + return null; + } + } + + /** Find the _Fields constant that matches fieldId, throwing an exception if it is not found. */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) + throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** Find the _Fields constant that matches name, or null if its not found. */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + public static final Map<_Fields, FieldMetaData> metaDataMap; + + static { + Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class); + metaDataMap = Collections.unmodifiableMap(tmpMap); + FieldMetaData.addStructMetaDataMap(host_port.class, metaDataMap); + } + + public host_port() { + this.hostAddress = null; + this.port = 0; + } + + + public boolean isInvalid() { + return this.hostAddress == null || this.port == 0 ; + } + + + public String get_ip() throws UnknownHostException{ + return getInetAddress(hostAddress).toString(); + } + + public String getIpAndPort(){ + return get_ip() + get_port(); + } + + public short get_port() { + return port; + } + + public static InetAddress getInetAddress(final String host) { + InetAddress[] addrs = getAllInetAddresses(host); + if (addrs != null && addrs.length > 0) { + return addrs[0]; + } + return null; + } + + public static InetAddress[] getAllInetAddresses(final String host) { + final long start = System.nanoTime(); + try { + InetAddress[] ipAddrs = InetAddress.getAllByName(host); + long latency = System.nanoTime() - start; + if (latency > 500000/*ns*/ && LOG.isDebugEnabled()) { + LOG.debug("Resolved IP of `{}' to {} in {}ns", host, ipAddrs, latency); + } else if (latency >= 3000000/*ns*/) { + LOG.warn("Slow DNS lookup! Resolved IP of `{}' to {} in {}ns", host, ipAddrs, latency); + } + return ipAddrs; + } catch (UnknownHostException e) { + LOG.error("Failed to resolve the IP of `{}' in {}ns", host, (System.nanoTime() - start)); + return null; + } + } + + /** Performs a deep copy on other. */ + public host_port(host_port other) { + this.hostAddress = other.hostAddress; + this.port = other.port; + } + + public host_port deepCopy() { + return new host_port(this); + } + + @Override + public void clear() { + this.hostAddress = null; + this.port = 0; + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + } + throw new IllegalStateException(); + } + + /** + * Returns true if field corresponding to fieldID is set (has been asigned a value) and false + * otherwise + */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) return false; + if (that instanceof host_port) return this.equals((host_port) that); + return false; + } + + public boolean equals(host_port that) { + if (that == null) return false; + return this.hostAddress.equals(that.hostAddress) && this.port == that.port; + } + + @Override + public int hashCode() { + int result = 17; + result = 31 * result + (hostAddress != null ? hostAddress.hashCode() : 0); + result = 31 * result + port; + return result; + } + + public int compareTo(host_port other) { + if(other == null){ + return 1; + } + + int comparsion = this.hostAddress.compareTo(other.hostAddress); + if(comparsion == 0){ + return comparsion; + } + + return Integer.compare(this.port, other.port); + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(TProtocol iprot) throws TException { + port = iprot.readI16(); + hostAddress = iprot.readString(); + // check for required fields of primitive type, which can't be checked in the validate method + validate(); + } + + public void write(TProtocol oprot) throws TException { + validate(); + oprot.writeString(hostAddress); + oprot.writeI16(port); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("host_port("); + try { + sb.append(get_ip()); + } catch (UnknownHostException e) { + sb.append("invalid_addr"); + } + sb.append(":"); + sb.append(get_port()); + sb.append(")"); + return sb.toString(); + } + + public void validate() throws TException { + // check for required fields + } +} diff --git a/java-client/src/main/java/org/apache/pegasus/base/rpc_address.java b/java-client/src/main/java/org/apache/pegasus/base/rpc_address.java index dfcad0175d..1bbb745535 100644 --- a/java-client/src/main/java/org/apache/pegasus/base/rpc_address.java +++ b/java-client/src/main/java/org/apache/pegasus/base/rpc_address.java @@ -34,6 +34,7 @@ import org.apache.thrift.meta_data.FieldMetaData; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TStruct; +import org.apache.pegasus.base.host_port; public final class rpc_address implements TBase, java.io.Serializable, Cloneable { @@ -106,6 +107,10 @@ public rpc_address() { this.address = 0; } + public rpc_address(host_port hp){ + this.address = hp.getIp(); + } + public boolean isInvalid() { return this.address == 0; } diff --git a/java-client/src/main/java/org/apache/pegasus/rpc/async/MetaSession.java b/java-client/src/main/java/org/apache/pegasus/rpc/async/MetaSession.java index 31bd9ac331..eecd68621b 100644 --- a/java-client/src/main/java/org/apache/pegasus/rpc/async/MetaSession.java +++ b/java-client/src/main/java/org/apache/pegasus/rpc/async/MetaSession.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import org.apache.pegasus.base.error_code; import org.apache.pegasus.base.rpc_address; +import org.apache.pegasus.base.host_port; import org.apache.pegasus.operator.client_operator; import org.apache.pegasus.operator.create_app_operator; import org.apache.pegasus.operator.drop_app_operator; @@ -76,6 +77,7 @@ public MetaSession( this.eachQueryTimeoutInMills = eachQueryTimeoutInMills; this.defaultMaxQueryCount = defaultMaxQueryCount; this.group = g; + this.hostPort = null; } public static error_code.error_types getMetaServiceError(client_operator metaQueryOp) { @@ -106,6 +108,7 @@ public static rpc_address getMetaServiceForwardAddress(client_operator metaQuery return null; java.util.List partitions = op.get_response().getPartitions(); if (partitions == null || partitions.isEmpty()) return null; + addr = tygGetHostPort(partitions); addr = partitions.get(0).getPrimary(); if (addr == null || addr.isInvalid()) return null; } @@ -113,6 +116,16 @@ public static rpc_address getMetaServiceForwardAddress(client_operator metaQuery return addr; } + public rpc_address tygGetHostPort(java.util.List partitions){ + hostPort = partitions.get(0).getHp_primary(); + if(hostPort != null){ + rpc_address addr = new rpc_address(hostPort); + } + else{ + return partitions.get(0).getPrimary(); + } + } + public final void asyncExecute(client_operator op, Runnable callbackFunc, int maxExecuteCount) { if (maxExecuteCount == 0) { maxExecuteCount = defaultMaxQueryCount; diff --git a/java-client/src/main/java/org/apache/pegasus/rpc/async/TableHandler.java b/java-client/src/main/java/org/apache/pegasus/rpc/async/TableHandler.java index 33247acf58..045dda2878 100644 --- a/java-client/src/main/java/org/apache/pegasus/rpc/async/TableHandler.java +++ b/java-client/src/main/java/org/apache/pegasus/rpc/async/TableHandler.java @@ -180,7 +180,7 @@ void initTableConfiguration(query_cfg_response resp) { continue; } - replicaConfig.primaryAddress = pc.getPrimary(); + replicaConfig.primaryAddress = pc.getHp_primary(); // If the primary address is invalid, we don't create secondary session either. // Because all of these sessions will be recreated later. if (replicaConfig.primaryAddress.isInvalid()) { @@ -192,6 +192,7 @@ void initTableConfiguration(query_cfg_response resp) { // backup request is enabled, get all secondary sessions if (isBackupRequestEnabled()) { // secondary sessions + pc.hp_secondaries.forEach(); pc.secondaries.forEach( secondary -> { ReplicaSession session = tryConnect(secondary, futureGroup);