Skip to content
This repository has been archived by the owner on Mar 31, 2023. It is now read-only.

Commit

Permalink
[Tracing] Support Jaeger tracing in NCM and TC (#707)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzxgzgz authored Dec 10, 2021
1 parent d3354de commit 42a6edc
Show file tree
Hide file tree
Showing 9 changed files with 318 additions and 37 deletions.
16 changes: 16 additions & 0 deletions services/network_config_manager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,27 @@ Copyright(c) 2020 Futurewei Cloud
</repositories>

<dependencies>
<dependency>
<groupId>io.jaegertracing</groupId>
<artifactId>jaeger-tracerresolver</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-spring-cloud-starter</artifactId>
<version>0.3.12</version>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-grpc</artifactId>
<version>0.2.3</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.1.36.Final</version>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-spring-jaeger-cloud-starter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,30 @@ free of charge, to any person obtaining a copy of this software and associated d
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollSocketChannel;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.jaegertracing.Configuration;
import io.jaegertracing.internal.JaegerTracer;
import io.jaegertracing.internal.samplers.ConstSampler;
import io.lettuce.core.dynamic.annotation.Param;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.contrib.tracerresolver.TracerResolver;
import io.opentracing.util.GlobalTracer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.stream.Collectors;
import io.opentracing.Tracer;
import io.opentracing.contrib.grpc.TracingClientInterceptor;

@Service("grpcGoalStateClient")
public class GoalStateClientImpl implements GoalStateClient {
Expand All @@ -62,14 +77,16 @@ public class GoalStateClientImpl implements GoalStateClient {

private ConcurrentHashMap<String, ArrayList<GrpcChannelStub>> hostIpGrpcChannelStubMap;

private final Tracer tracer;

@DurationStatistics
public static GoalStateClientImpl getInstance(int numberOfGrpcChannelPerHost, int numberOfWarmupsPerChannel, ArrayList<String> monitorHosts) {
if (instance == null) {
instance = new GoalStateClientImpl(numberOfGrpcChannelPerHost, numberOfWarmupsPerChannel, monitorHosts);
}
return instance;
}


public GoalStateClientImpl(@Value("${grpc.number-of-channels-per-host:1}") int numberOfGrpcChannelPerHost, @Value("${grpc.number-of-warmups-per-channel:1}") int numberOfWarmupsPerChannel, @Value("")ArrayList<String> monitorHosts) {

if ((this.numberOfGrpcChannelPerHost = numberOfGrpcChannelPerHost) < 1) {
Expand All @@ -82,8 +99,8 @@ public GoalStateClientImpl(@Value("${grpc.number-of-channels-per-host:1}") int n

this.monitorHosts = monitorHosts;
logger.log(Level.FINE, "Printing out all monitorHosts");
for(String host : this.monitorHosts){
logger.log(Level.FINE, "Monitoring this host: "+ host);
for (String host : this.monitorHosts) {
logger.log(Level.FINE, "Monitoring this host: " + host);
}
logger.log(Level.FINE, "Done printing out all monitorHosts");
this.hostAgentPort = 50001;
Expand All @@ -96,20 +113,33 @@ public GoalStateClientImpl(@Value("${grpc.number-of-channels-per-host:1}") int n
new DefaultThreadFactory("grpc-thread-pool"));
//TODO: Setup a connection pool. one ACA, one client.
this.hostIpGrpcChannelStubMap = new ConcurrentHashMap();
logger.log(Level.FINE, "This instance has "+ numberOfGrpcChannelPerHost+" channels, and "+ numberOfWarmupsPerChannel+" warmups");
Configuration.SamplerConfiguration samplerConfiguration = Configuration.SamplerConfiguration.fromEnv()
.withType(ConstSampler.TYPE)
.withParam(1);
Configuration.ReporterConfiguration reporterConfiguration = Configuration.ReporterConfiguration.fromEnv()
.withLogSpans(true);


this.tracer = GlobalTracer.get();//Configuration.fromEnv().getTracer();
//TracerResolver.resolveTracer();
logger.log(Level.INFO, "[GoalStateClientImpl] Got this global tracer: "+this.tracer.toString());
logger.log(Level.FINE, "This instance has " + numberOfGrpcChannelPerHost + " channels, and " + numberOfWarmupsPerChannel + " warmups");

}

@Override
@DurationStatistics
public List<String> sendGoalStates(Map<String, HostGoalState> hostGoalStates) throws Exception {

final CountDownLatch finishLatch = new CountDownLatch(hostGoalStates.values().size());
logger.log(Level.INFO, "Host goal states size: " + hostGoalStates.values().size());
List<String> replies = new ArrayList<>();

for (HostGoalState hostGoalState : hostGoalStates.values()) {
doSendGoalState(hostGoalState, finishLatch, replies);
}


if (!finishLatch.await(1, TimeUnit.MINUTES)) {
logger.log(Level.WARNING, "Send goal states can not finish within 1 minutes");
return Arrays.asList("Send goal states can not finish within 1 minutes");
Expand All @@ -120,6 +150,7 @@ public List<String> sendGoalStates(Map<String, HostGoalState> hostGoalStates) th
return new ArrayList<>();
}

@DurationStatistics
private GrpcChannelStub getOrCreateGrpcChannel(String hostIp) {
if (!this.hostIpGrpcChannelStubMap.containsKey(hostIp)) {
this.hostIpGrpcChannelStubMap.put(hostIp, createGrpcChannelStubArrayList(hostIp));
Expand All @@ -139,20 +170,36 @@ private GrpcChannelStub getOrCreateGrpcChannel(String hostIp) {
return this.hostIpGrpcChannelStubMap.get(hostIp).get(usingChannelWithThisIndex);
}

@DurationStatistics
private ArrayList<GrpcChannelStub> createGrpcChannelStubArrayList(String hostIp) {
long start = System.currentTimeMillis();
ArrayList<GrpcChannelStub> arr = new ArrayList<>();
List<Future<Integer>> channels_warmup_future = new ArrayList<>();
for (int i = 0; i < numberOfGrpcChannelPerHost; i++) {
GrpcChannelStub channelStub = createGrpcChannelStub(hostIp);
warmUpChannelStub(channelStub, hostIp);
arr.add(channelStub);
// wait until all warmups for all channels are finished.
Future<Integer> channel_warmup_future = this.executor.submit(()->{
warmUpChannelStub(channelStub, hostIp);
arr.add(channelStub);
return 1;
});
channels_warmup_future.add(channel_warmup_future);
}
channels_warmup_future.parallelStream().filter(Objects::nonNull).map(future -> {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
}).collect(Collectors.toList());
long end = System.currentTimeMillis();
logger.log(Level.FINE, "[createGrpcChannelStubArrayList] Created " + numberOfGrpcChannelPerHost + " gRPC channel stubs for host " + hostIp + ", elapsed Time in milli seconds: " + (end - start));
return arr;
}

// try to warmup a gRPC channel and its stub, by sending an empty GoalState`.
@DurationStatistics
void warmUpChannelStub(GrpcChannelStub channelStub, String hostIp) {
GoalStateProvisionerGrpc.GoalStateProvisionerStub asyncStub = channelStub.stub;

Expand Down Expand Up @@ -193,22 +240,32 @@ public void onCompleted() {
return;
}

@DurationStatistics
private GrpcChannelStub createGrpcChannelStub(String hostIp) {
// adding tracing stuffs for each channel
TracingClientInterceptor tracingClientInterceptor = TracingClientInterceptor
.newBuilder()
.withTracer(this.tracer)
.withVerbosity()
.withStreaming()
.build();


ManagedChannel channel = ManagedChannelBuilder.forAddress(hostIp, this.hostAgentPort)
.usePlaintext()
.keepAliveWithoutCalls(true)
.keepAliveTime(Long.MAX_VALUE, TimeUnit.SECONDS)
.build();
GoalStateProvisionerGrpc.GoalStateProvisionerStub asyncStub = GoalStateProvisionerGrpc.newStub(channel);

GoalStateProvisionerGrpc.GoalStateProvisionerStub asyncStub = GoalStateProvisionerGrpc.newStub(tracingClientInterceptor.intercept(channel));
return new GrpcChannelStub(channel, asyncStub);

}

private void doSendGoalState(HostGoalState hostGoalState, CountDownLatch finishLatch, List<String> replies) throws InterruptedException {
String hostIp = hostGoalState.getHostIp();
logger.log(Level.FINE, "Setting up a channel to ACA on: " + hostIp);
long start = System.currentTimeMillis();

long end = 0;
GrpcChannelStub channelStub = getOrCreateGrpcChannel(hostIp);
long chan_established = System.currentTimeMillis();
logger.log(Level.FINE, "[doSendGoalState] Established channel, elapsed Time in milli seconds: " + (chan_established - start));
Expand All @@ -218,6 +275,7 @@ private void doSendGoalState(HostGoalState hostGoalState, CountDownLatch finishL
logger.log(Level.FINE, "[doSendGoalState] Established stub, elapsed Time after channel established in milli seconds: " + (stub_established - chan_established));

Map<String, List<Goalstateprovisioner.GoalStateOperationReply.GoalStateOperationStatus>> result = new HashMap<>();

StreamObserver<Goalstateprovisioner.GoalStateOperationReply> responseObserver = new StreamObserver<>() {
@Override
public void onNext(Goalstateprovisioner.GoalStateOperationReply reply) {
Expand All @@ -244,10 +302,18 @@ public void onCompleted() {
};

StreamObserver<Goalstate.GoalStateV2> requestObserver = asyncStub.pushGoalStatesStream(responseObserver);
long requestObserverEstablished = System.currentTimeMillis();
logger.log(Level.FINE, "[doSendGoalState] Established RequestObserver, elapsed Time after stub established in milli seconds: " + (requestObserverEstablished - stub_established));
try {
long before_get_goalState = System.currentTimeMillis();
Goalstate.GoalStateV2 goalState = hostGoalState.getGoalState();
logger.log(Level.INFO, "Sending GS to Host " + hostIp + " as follows | " + goalState.toString());
long after_get_goalState = System.currentTimeMillis();
logger.log(Level.INFO, "Sending GS with size " + goalState.getSerializedSize() + " to Host " + hostIp + " as follows | " + goalState.toString());
requestObserver.onNext(goalState);
long after_onNext = System.currentTimeMillis();
logger.log(Level.FINE, "[doSendGoalState] Get goalstatev2 from HostGoalState in milliseconds: " + (after_get_goalState - before_get_goalState));
logger.log(Level.FINE, "[doSendGoalState] Call onNext in milliseconds: " + (after_onNext - after_get_goalState));

if (hostGoalState.getGoalState().getNeighborStatesCount() == 1 && monitorHosts.contains(hostIp)) {
long sent_gs_time = System.currentTimeMillis();
// If there's only one neighbor state and it is trying to send it to aca_node_one, the IP of which is now
Expand All @@ -265,12 +331,16 @@ public void onCompleted() {
// Mark the end of requests
logger.log(Level.INFO, "Sending GS to Host " + hostIp + " is completed");

// comment out onCompleted so that the same channel/stub and keep sending next time.
end = System.currentTimeMillis();
long onNext_called = System.currentTimeMillis();
logger.log(Level.FINE, "[doSendGoalState] Whole function call took time in milliseconds: "+(end - start) +
" \nFrom established stub to onNext called, elapsed Time after channel established in milli seconds: " + (onNext_called - requestObserverEstablished));
requestObserver.onCompleted();

// shutdown(channel);
}

@DurationStatistics
private void shutdown(ManagedChannel channel) {
try {
channel.shutdown().awaitTermination(Config.SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
Expand Down
Loading

0 comments on commit 42a6edc

Please sign in to comment.