Skip to content

Commit

Permalink
Support metric for grpc server executor (#11428)
Browse files Browse the repository at this point in the history
* support metric for grpc server executor

* remove unused field

* add copyright

* add ut
  • Loading branch information
Daydreamer-ia authored Dec 6, 2023
1 parent 92f36b1 commit 01a28ee
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.core.monitor;

import com.alibaba.nacos.core.remote.grpc.GrpcClusterServer;
import com.alibaba.nacos.core.remote.grpc.GrpcSdkServer;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.IntervalTask;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.ThreadPoolExecutor;

/**
* Used to collect grpc server executor metrics.
*
* @author Daydreamer-ia
*/
@Component
public class GrpcServerThreadPoolMonitor implements SchedulingConfigurer {

@Resource
private GrpcSdkServer sdkServer;

@Resource
private GrpcClusterServer clusterServer;

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
Boolean enabled = EnvUtil.getProperty("nacos.metric.grpc.server.executor.enabled", Boolean.class, true);
if (!enabled) {
return;
}
taskRegistrar.addFixedRateTask(new IntervalTask(() -> {
// sdk server
ThreadPoolExecutor sdkServerRpcExecutor = sdkServer.getRpcExecutor();
MetricsMonitor.getSdkServerExecutorMetric().getTaskCount().set(sdkServerRpcExecutor.getTaskCount());
MetricsMonitor.getSdkServerExecutorMetric().getCompletedTaskCount().set(sdkServerRpcExecutor.getCompletedTaskCount());
MetricsMonitor.getSdkServerExecutorMetric().getInQueueTaskCount().set(sdkServerRpcExecutor.getQueue().size());
MetricsMonitor.getSdkServerExecutorMetric().getActiveCount().set(sdkServerRpcExecutor.getActiveCount());
MetricsMonitor.getSdkServerExecutorMetric().getCorePoolSize().set(sdkServerRpcExecutor.getCorePoolSize());
MetricsMonitor.getSdkServerExecutorMetric().getMaximumPoolSize().set(sdkServerRpcExecutor.getMaximumPoolSize());
MetricsMonitor.getSdkServerExecutorMetric().getPoolSize().set(sdkServerRpcExecutor.getPoolSize());

// cluster server
ThreadPoolExecutor clusterServerRpcExecutor = clusterServer.getRpcExecutor();
MetricsMonitor.getClusterServerExecutorMetric().getTaskCount().set(clusterServerRpcExecutor.getTaskCount());
MetricsMonitor.getClusterServerExecutorMetric().getCompletedTaskCount().set(clusterServerRpcExecutor.getCompletedTaskCount());
MetricsMonitor.getClusterServerExecutorMetric().getInQueueTaskCount().set(clusterServerRpcExecutor.getQueue().size());
MetricsMonitor.getClusterServerExecutorMetric().getActiveCount().set(clusterServerRpcExecutor.getActiveCount());
MetricsMonitor.getClusterServerExecutorMetric().getCorePoolSize().set(clusterServerRpcExecutor.getCorePoolSize());
MetricsMonitor.getClusterServerExecutorMetric().getMaximumPoolSize().set(clusterServerRpcExecutor.getMaximumPoolSize());
MetricsMonitor.getClusterServerExecutorMetric().getPoolSize().set(clusterServerRpcExecutor.getPoolSize());
}, Integer.parseInt(EnvUtil.getProperty("nacos.metric.grpc.server.executor.interval", "15000")), 1000L));
}
}
137 changes: 136 additions & 1 deletion core/src/main/java/com/alibaba/nacos/core/monitor/MetricsMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* The Metrics center.
Expand All @@ -47,6 +48,10 @@ public final class MetricsMonitor {

private static AtomicInteger longConnection = new AtomicInteger();

private static GrpcServerExecutorMetric sdkServerExecutorMetric = new GrpcServerExecutorMetric("grpcSdkServer");

private static GrpcServerExecutorMetric clusterServerExecutorMetric = new GrpcServerExecutorMetric("grpcClusterServer");

private static Map<String, AtomicInteger> moduleConnectionCnt = new ConcurrentHashMap<>();

static {
Expand Down Expand Up @@ -75,7 +80,53 @@ public final class MetricsMonitor {
tags.add(immutableTag);
tags.add(new ImmutableTag("name", "longConnection"));
NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, longConnection);


tags = new ArrayList<>();
tags.add(immutableTag);
tags.add(new ImmutableTag("type", sdkServerExecutorMetric.getType()));
initGrpcServerExecutorMetric(tags, sdkServerExecutorMetric);

tags = new ArrayList<>();
tags.add(immutableTag);
tags.add(new ImmutableTag("type", clusterServerExecutorMetric.getType()));
initGrpcServerExecutorMetric(tags, clusterServerExecutorMetric);
}

private static void initGrpcServerExecutorMetric(List<Tag> tags, GrpcServerExecutorMetric metric) {
List<Tag> snapshotTags = new ArrayList<>();
snapshotTags.add(new ImmutableTag("name", "activeCount"));
snapshotTags.addAll(tags);
NacosMeterRegistryCenter.gauge(METER_REGISTRY, "grpc_server_executor", snapshotTags, metric.getActiveCount());

snapshotTags = new ArrayList<>();
snapshotTags.add(new ImmutableTag("name", "poolSize"));
snapshotTags.addAll(tags);
NacosMeterRegistryCenter.gauge(METER_REGISTRY, "grpc_server_executor", snapshotTags, metric.getPoolSize());

snapshotTags = new ArrayList<>();
snapshotTags.add(new ImmutableTag("name", "corePoolSize"));
snapshotTags.addAll(tags);
NacosMeterRegistryCenter.gauge(METER_REGISTRY, "grpc_server_executor", snapshotTags, metric.getCorePoolSize());

snapshotTags = new ArrayList<>();
snapshotTags.add(new ImmutableTag("name", "maximumPoolSize"));
snapshotTags.addAll(tags);
NacosMeterRegistryCenter.gauge(METER_REGISTRY, "grpc_server_executor", snapshotTags, metric.getMaximumPoolSize());

snapshotTags = new ArrayList<>();
snapshotTags.add(new ImmutableTag("name", "inQueueTaskCount"));
snapshotTags.addAll(tags);
NacosMeterRegistryCenter.gauge(METER_REGISTRY, "grpc_server_executor", snapshotTags, metric.getInQueueTaskCount());

snapshotTags = new ArrayList<>();
snapshotTags.add(new ImmutableTag("name", "taskCount"));
snapshotTags.addAll(tags);
NacosMeterRegistryCenter.gauge(METER_REGISTRY, "grpc_server_executor", snapshotTags, metric.getTaskCount());

snapshotTags = new ArrayList<>();
snapshotTags.add(new ImmutableTag("name", "completedTaskCount"));
snapshotTags.addAll(tags);
NacosMeterRegistryCenter.gauge(METER_REGISTRY, "grpc_server_executor", snapshotTags, metric.getCompletedTaskCount());
}

public static AtomicInteger getLongConnectionMonitor() {
Expand Down Expand Up @@ -106,6 +157,90 @@ public static DistributionSummary getRaftFromLeader() {
return RAFT_FROM_LEADER;
}

public static GrpcServerExecutorMetric getSdkServerExecutorMetric() {
return sdkServerExecutorMetric;
}

public static GrpcServerExecutorMetric getClusterServerExecutorMetric() {
return clusterServerExecutorMetric;
}

public static class GrpcServerExecutorMetric {

private String type;

/**
* cout of thread are running job.
*/
private AtomicInteger activeCount = new AtomicInteger();

/**
* core thread count.
*/
private AtomicInteger corePoolSize = new AtomicInteger();

/**
* current thread count.
*/
private AtomicInteger poolSize = new AtomicInteger();

/**
* max thread count.
*/
private AtomicInteger maximumPoolSize = new AtomicInteger();

/**
* task count in queue.
*/
private AtomicInteger inQueueTaskCount = new AtomicInteger();

/**
* completed task count.
*/
private AtomicLong completedTaskCount = new AtomicLong();

/**
* task count.
*/
private AtomicLong taskCount = new AtomicLong();

private GrpcServerExecutorMetric(String type) {
this.type = type;
}

public AtomicInteger getActiveCount() {
return activeCount;
}

public AtomicInteger getCorePoolSize() {
return corePoolSize;
}

public AtomicInteger getPoolSize() {
return poolSize;
}

public AtomicInteger getMaximumPoolSize() {
return maximumPoolSize;
}

public AtomicInteger getInQueueTaskCount() {
return inQueueTaskCount;
}

public AtomicLong getCompletedTaskCount() {
return completedTaskCount;
}

public AtomicLong getTaskCount() {
return taskCount;
}

public String getType() {
return type;
}
}

/**
* refresh all module connection count.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,44 @@ public void initMeterRegistry() {
NacosMeterRegistryCenter.getMeterRegistry(NacosMeterRegistryCenter.CORE_STABLE_REGISTRY)
.add(new SimpleMeterRegistry());
}

@Test
public void testSdkServerExecutorMetric() {
MetricsMonitor.getSdkServerExecutorMetric().getPoolSize().set(1);
MetricsMonitor.getSdkServerExecutorMetric().getMaximumPoolSize().set(1);
MetricsMonitor.getSdkServerExecutorMetric().getCorePoolSize().set(1);
MetricsMonitor.getSdkServerExecutorMetric().getActiveCount().set(1);
MetricsMonitor.getSdkServerExecutorMetric().getInQueueTaskCount().set(1);
MetricsMonitor.getSdkServerExecutorMetric().getTaskCount().set(1);
MetricsMonitor.getSdkServerExecutorMetric().getCompletedTaskCount().set(1);
Assert.assertEquals(MetricsMonitor.getSdkServerExecutorMetric().getType(), "grpcSdkServer");
Assert.assertEquals(MetricsMonitor.getSdkServerExecutorMetric().getPoolSize().get(), 1);
Assert.assertEquals(MetricsMonitor.getSdkServerExecutorMetric().getMaximumPoolSize().get(), 1);
Assert.assertEquals(MetricsMonitor.getSdkServerExecutorMetric().getCorePoolSize().get(), 1);
Assert.assertEquals(MetricsMonitor.getSdkServerExecutorMetric().getActiveCount().get(), 1);
Assert.assertEquals(MetricsMonitor.getSdkServerExecutorMetric().getInQueueTaskCount().get(), 1);
Assert.assertEquals(MetricsMonitor.getSdkServerExecutorMetric().getTaskCount().get(), 1);
Assert.assertEquals(MetricsMonitor.getSdkServerExecutorMetric().getCompletedTaskCount().get(), 1);
}

@Test
public void testClusterServerExecutorMetric() {
MetricsMonitor.getClusterServerExecutorMetric().getPoolSize().set(1);
MetricsMonitor.getClusterServerExecutorMetric().getMaximumPoolSize().set(1);
MetricsMonitor.getClusterServerExecutorMetric().getCorePoolSize().set(1);
MetricsMonitor.getClusterServerExecutorMetric().getActiveCount().set(1);
MetricsMonitor.getClusterServerExecutorMetric().getInQueueTaskCount().set(1);
MetricsMonitor.getClusterServerExecutorMetric().getTaskCount().set(1);
MetricsMonitor.getClusterServerExecutorMetric().getCompletedTaskCount().set(1);
Assert.assertEquals(MetricsMonitor.getClusterServerExecutorMetric().getType(), "grpcClusterServer");
Assert.assertEquals(MetricsMonitor.getClusterServerExecutorMetric().getPoolSize().get(), 1);
Assert.assertEquals(MetricsMonitor.getClusterServerExecutorMetric().getMaximumPoolSize().get(), 1);
Assert.assertEquals(MetricsMonitor.getClusterServerExecutorMetric().getCorePoolSize().get(), 1);
Assert.assertEquals(MetricsMonitor.getClusterServerExecutorMetric().getActiveCount().get(), 1);
Assert.assertEquals(MetricsMonitor.getClusterServerExecutorMetric().getInQueueTaskCount().get(), 1);
Assert.assertEquals(MetricsMonitor.getClusterServerExecutorMetric().getTaskCount().get(), 1);
Assert.assertEquals(MetricsMonitor.getClusterServerExecutorMetric().getCompletedTaskCount().get(), 1);
}

@Test
public void testGetLongConnectionMonitor() {
Expand Down

0 comments on commit 01a28ee

Please sign in to comment.