Skip to content

Commit

Permalink
[ISSUE #10797]Enhance the registration logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
stone-98 committed Oct 7, 2023
1 parent f3fb428 commit 0ba1817
Showing 1 changed file with 74 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.packagescan.resource.Resource;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.ServerRequestHandler;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.alibaba.nacos.common.utils.TlsTypeResolve;
import com.alibaba.nacos.common.utils.ThreadFactoryBuilder;
import com.alibaba.nacos.common.utils.TlsTypeResolve;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
Expand All @@ -54,16 +54,15 @@
import io.grpc.netty.shaded.io.grpc.netty.NegotiationType;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;

import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -356,56 +355,59 @@ public Connection connectToServer(ServerInfo serverInfo) {
int port = serverInfo.getServerPort() + rpcPortOffset();
ManagedChannel managedChannel = createNewManagedChannel(serverInfo.getServerIp(), port);
RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(managedChannel);
if (newChannelStubTemp != null) {

Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
if (response == null || !(response instanceof ServerCheckResponse)) {
shuntDownChannel(managedChannel);

Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
if (!(response instanceof ServerCheckResponse)) {
shuntDownChannel(managedChannel);
return null;
}
// submit ability table as soon as possible
// ability table will be null if server doesn't support ability table
ServerCheckResponse serverCheckResponse = (ServerCheckResponse) response;
connectionId = serverCheckResponse.getConnectionId();

BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(
newChannelStubTemp.getChannel());
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(connectionId);
// if not supported, it will be false
if (serverCheckResponse.isSupportAbilityNegotiation()) {
// mark
this.recAbilityContext.reset(grpcConn);
}

//create stream request and bind connection event to this connection.
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);

// stream observer to send response to server
grpcConn.setPayloadStreamObserver(payloadStreamObserver);
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
grpcConn.setChannel(managedChannel);
//send a setup request.
ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels());
// set ability table
conSetupRequest.setAbilityTable(
NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(abilityMode()));
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);
// wait for response
if (recAbilityContext.isNeedToSync()) {
// try to wait for notify response
boolean waitForResponse = recAbilityContext.await(this.clientConfig.capabilityNegotiationTimeout(),
TimeUnit.MILLISECONDS);
if (!waitForResponse) {
// haven't received a response for registration; need to register again.
return null;
}

// submit ability table as soon as possible
// ability table will be null if server doesn't support ability table
ServerCheckResponse serverCheckResponse = (ServerCheckResponse) response;
connectionId = serverCheckResponse.getConnectionId();

BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(
newChannelStubTemp.getChannel());
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(connectionId);
// if not supported, it will be false
if (serverCheckResponse.isSupportAbilityNegotiation()) {
// mark
this.recAbilityContext.reset(grpcConn);
}

//create stream request and bind connection event to this connection.
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);

// stream observer to send response to server
grpcConn.setPayloadStreamObserver(payloadStreamObserver);
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
grpcConn.setChannel(managedChannel);
//send a setup request.
ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels());
// set ability table
conSetupRequest.setAbilityTable(NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(abilityMode()));
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);
// wait for response
if (recAbilityContext.isNeedToSync()) {
// try to wait for notify response
recAbilityContext.await(this.clientConfig.capabilityNegotiationTimeout(), TimeUnit.MILLISECONDS);
} else {
// leave for adapting old version server
// wait to register connection setup
Thread.sleep(100L);
}
return grpcConn;
} else {
// leave for adapting old version server
// registration is considered successful by default after 100ms
// wait to register connection setup
Thread.sleep(100L);
}
return null;
return grpcConn;
} catch (Exception e) {
LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e);
// remove and notify
Expand All @@ -425,17 +427,17 @@ public Connection connectToServer(ServerInfo serverInfo) {
protected void afterReset(ConnectResetRequest request) {
recAbilityContext.release(null);
}

/**
* This is for receiving server abilities.
*/
class RecAbilityContext {

static class RecAbilityContext {
/**
* connection waiting for server abilities.
*/
private volatile Connection connection;

/**
* way to block client.
*/
Expand Down Expand Up @@ -484,19 +486,24 @@ public void release(Map<String, Boolean> abilities) {
}
this.needToSync = false;
}

/**
* await for abilities.
* Wait for a specified duration for a condition to be met.
*
* @param timeout timeout.
* @param unit unit.
* @throws InterruptedException by blocker.
* @param timeout The maximum time to wait.
* @param unit The time unit for the timeout.
* @return true if the condition was successfully awaited, false otherwise.
* @throws InterruptedException if the waiting thread is interrupted.
*/
public void await(long timeout, TimeUnit unit) throws InterruptedException {
if (this.blocker != null) {
this.blocker.await(timeout, unit);
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
if (blocker != null) {
boolean waitForResponse = blocker.await(timeout, unit);
if (waitForResponse) {
needToSync = false;
}
return waitForResponse;
}
this.needToSync = false;
return false;
}
}

Expand Down

0 comments on commit 0ba1817

Please sign in to comment.