Skip to content

Commit

Permalink
Merge branch 'develop' into fix/npe
Browse files Browse the repository at this point in the history
# Conflicts:
#	common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java
  • Loading branch information
Daydreamer-ia committed Oct 10, 2023
2 parents c9d7785 + 7911eb0 commit fa30a7f
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import io.grpc.netty.shaded.io.grpc.netty.NegotiationType;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;

import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -367,68 +366,66 @@ public Connection connectToServer(ServerInfo serverInfo) {
int port = serverInfo.getServerPort() + rpcPortOffset();
ManagedChannel managedChannel = createNewManagedChannel(serverInfo.getServerIp(), port);
RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(managedChannel);
if (newChannelStubTemp != null) {

Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
if (response == null || !(response instanceof ServerCheckResponse)) {
shuntDownChannel(managedChannel);
return null;
}

// submit ability table as soon as possible
// ability table will be null if server doesn't support ability table
ServerCheckResponse serverCheckResponse = (ServerCheckResponse) response;
connectionId = serverCheckResponse.getConnectionId();

BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(
newChannelStubTemp.getChannel());
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(connectionId);
// if not supported, it will be false
if (serverCheckResponse.isSupportAbilityNegotiation()) {
// mark
this.recAbilityContext.reset(grpcConn);
// promise null if no abilities receive
grpcConn.setAbilityTable(null);
}

//create stream request and bind connection event to this connection.
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);

// stream observer to send response to server
grpcConn.setPayloadStreamObserver(payloadStreamObserver);
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
grpcConn.setChannel(managedChannel);
//send a setup request.
ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels());
// set ability table
conSetupRequest.setAbilityTable(NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(abilityMode()));
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);
// wait for response
if (recAbilityContext.isNeedToSync()) {
// try to wait for notify response
recAbilityContext.await(this.clientConfig.capabilityNegotiationTimeout(), TimeUnit.MILLISECONDS);
// if no server abilities receiving, then reconnect
if (!grpcConn.isAbilitiesSet()) {
LOGGER.error("Client don't receive server abilities table even empty table but server supports ability negotiation."
+ " You can check if it is need to adjust the timeout of ability negotiation by property: {}"
+ " if always fail to connect.",
GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT);
grpcConn.setAbandon(true);
grpcConn.close();
return null;
}
} else {
// leave for adapting old version server
// wait to register connection setup
Thread.sleep(100L);
Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
if (!(response instanceof ServerCheckResponse)) {
shuntDownChannel(managedChannel);
return null;
}
// submit ability table as soon as possible
// ability table will be null if server doesn't support ability table
ServerCheckResponse serverCheckResponse = (ServerCheckResponse) response;
connectionId = serverCheckResponse.getConnectionId();

BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(
newChannelStubTemp.getChannel());
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(connectionId);
// if not supported, it will be false
if (serverCheckResponse.isSupportAbilityNegotiation()) {
// mark
this.recAbilityContext.reset(grpcConn);
// promise null if no abilities receive
grpcConn.setAbilityTable(null);
}

//create stream request and bind connection event to this connection.
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);

// stream observer to send response to server
grpcConn.setPayloadStreamObserver(payloadStreamObserver);
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
grpcConn.setChannel(managedChannel);
//send a setup request.
ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels());
// set ability table
conSetupRequest.setAbilityTable(
NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(abilityMode()));
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);
// wait for response
if (recAbilityContext.isNeedToSync()) {
// try to wait for notify response
recAbilityContext.await(this.clientConfig.capabilityNegotiationTimeout(), TimeUnit.MILLISECONDS);
// if no server abilities receiving, then reconnect
if (!grpcConn.isAbilitiesSet()) {
LOGGER.error("Client don't receive server abilities table even empty table but server supports ability negotiation."
+ " You can check if it is need to adjust the timeout of ability negotiation by property: {}"
+ " if always fail to connect.",
GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT);
grpcConn.setAbandon(true);
grpcConn.close();
return null;
}
return grpcConn;
} else {
// leave for adapting old version server
// registration is considered successful by default after 100ms
// wait to register connection setup
Thread.sleep(100L);
}
return null;
return grpcConn;
} catch (Exception e) {
LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e);
// remove and notify
Expand All @@ -452,7 +449,7 @@ protected void afterReset(ConnectResetRequest request) {
/**
* This is for receiving server abilities.
*/
class RecAbilityContext {
static class RecAbilityContext {

/**
* connection waiting for server abilities.
Expand Down
2 changes: 1 addition & 1 deletion distribution/bin/shutdown.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
cd `dirname $0`/../target
target_dir=`pwd`

pid=`ps ax | grep -i 'nacos.nacos' | grep ${target_dir} | grep java | grep -v grep | awk '{print $1}'`
pid=`pgrep -f nacos.nacos`
if [ -z "$pid" ] ; then
echo "No nacosServer running."
exit -1;
Expand Down
2 changes: 1 addition & 1 deletion distribution/bin/startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,4 @@ else
nohup "$JAVA" "$JAVA_OPT_EXT_FIX" ${JAVA_OPT} nacos.nacos >> ${BASE_DIR}/logs/start.out 2>&1 &
fi

echo "nacos is startingyou can check the ${BASE_DIR}/logs/start.out"
echo "nacos is starting. you can check the ${BASE_DIR}/logs/start.out"

0 comments on commit fa30a7f

Please sign in to comment.