diff --git a/core/src/main/java/com/alibaba/nacos/core/monitor/GrpcServerThreadPoolMonitor.java b/core/src/main/java/com/alibaba/nacos/core/monitor/GrpcServerThreadPoolMonitor.java new file mode 100644 index 00000000000..132d0976025 --- /dev/null +++ b/core/src/main/java/com/alibaba/nacos/core/monitor/GrpcServerThreadPoolMonitor.java @@ -0,0 +1,72 @@ +/* + * Copyright 1999-2023 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.monitor; + +import com.alibaba.nacos.core.remote.grpc.GrpcClusterServer; +import com.alibaba.nacos.core.remote.grpc.GrpcSdkServer; +import com.alibaba.nacos.sys.env.EnvUtil; +import org.springframework.scheduling.annotation.SchedulingConfigurer; +import org.springframework.scheduling.config.IntervalTask; +import org.springframework.scheduling.config.ScheduledTaskRegistrar; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Used to collect grpc server executor metrics. + * + * @author Daydreamer-ia + */ +@Component +public class GrpcServerThreadPoolMonitor implements SchedulingConfigurer { + + @Resource + private GrpcSdkServer sdkServer; + + @Resource + private GrpcClusterServer clusterServer; + + @Override + public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { + Boolean enabled = EnvUtil.getProperty("nacos.metric.grpc.server.executor.enabled", Boolean.class, true); + if (!enabled) { + return; + } + taskRegistrar.addFixedRateTask(new IntervalTask(() -> { + // sdk server + ThreadPoolExecutor sdkServerRpcExecutor = sdkServer.getRpcExecutor(); + MetricsMonitor.getSdkServerExecutorMetric().getTaskCount().set(sdkServerRpcExecutor.getTaskCount()); + MetricsMonitor.getSdkServerExecutorMetric().getCompletedTaskCount().set(sdkServerRpcExecutor.getCompletedTaskCount()); + MetricsMonitor.getSdkServerExecutorMetric().getInQueueTaskCount().set(sdkServerRpcExecutor.getQueue().size()); + MetricsMonitor.getSdkServerExecutorMetric().getActiveCount().set(sdkServerRpcExecutor.getActiveCount()); + MetricsMonitor.getSdkServerExecutorMetric().getCorePoolSize().set(sdkServerRpcExecutor.getCorePoolSize()); + MetricsMonitor.getSdkServerExecutorMetric().getMaximumPoolSize().set(sdkServerRpcExecutor.getMaximumPoolSize()); + MetricsMonitor.getSdkServerExecutorMetric().getPoolSize().set(sdkServerRpcExecutor.getPoolSize()); + + // cluster server + ThreadPoolExecutor clusterServerRpcExecutor = clusterServer.getRpcExecutor(); + MetricsMonitor.getClusterServerExecutorMetric().getTaskCount().set(clusterServerRpcExecutor.getTaskCount()); + MetricsMonitor.getClusterServerExecutorMetric().getCompletedTaskCount().set(clusterServerRpcExecutor.getCompletedTaskCount()); + MetricsMonitor.getClusterServerExecutorMetric().getInQueueTaskCount().set(clusterServerRpcExecutor.getQueue().size()); + MetricsMonitor.getClusterServerExecutorMetric().getActiveCount().set(clusterServerRpcExecutor.getActiveCount()); + MetricsMonitor.getClusterServerExecutorMetric().getCorePoolSize().set(clusterServerRpcExecutor.getCorePoolSize()); + MetricsMonitor.getClusterServerExecutorMetric().getMaximumPoolSize().set(clusterServerRpcExecutor.getMaximumPoolSize()); + MetricsMonitor.getClusterServerExecutorMetric().getPoolSize().set(clusterServerRpcExecutor.getPoolSize()); + }, Integer.parseInt(EnvUtil.getProperty("nacos.metric.grpc.server.executor.interval", "15000")), 1000L)); + } +} diff --git a/core/src/main/java/com/alibaba/nacos/core/monitor/MetricsMonitor.java b/core/src/main/java/com/alibaba/nacos/core/monitor/MetricsMonitor.java index 5a56e5f95c0..594e539b327 100644 --- a/core/src/main/java/com/alibaba/nacos/core/monitor/MetricsMonitor.java +++ b/core/src/main/java/com/alibaba/nacos/core/monitor/MetricsMonitor.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * The Metrics center. @@ -47,6 +48,10 @@ public final class MetricsMonitor { private static AtomicInteger longConnection = new AtomicInteger(); + private static GrpcServerExecutorMetric sdkServerExecutorMetric = new GrpcServerExecutorMetric("grpcSdkServer"); + + private static GrpcServerExecutorMetric clusterServerExecutorMetric = new GrpcServerExecutorMetric("grpcClusterServer"); + private static Map moduleConnectionCnt = new ConcurrentHashMap<>(); static { @@ -75,7 +80,53 @@ public final class MetricsMonitor { tags.add(immutableTag); tags.add(new ImmutableTag("name", "longConnection")); NacosMeterRegistryCenter.gauge(METER_REGISTRY, "nacos_monitor", tags, longConnection); - + + tags = new ArrayList<>(); + tags.add(immutableTag); + tags.add(new ImmutableTag("type", sdkServerExecutorMetric.getType())); + initGrpcServerExecutorMetric(tags, sdkServerExecutorMetric); + + tags = new ArrayList<>(); + tags.add(immutableTag); + tags.add(new ImmutableTag("type", clusterServerExecutorMetric.getType())); + initGrpcServerExecutorMetric(tags, clusterServerExecutorMetric); + } + + private static void initGrpcServerExecutorMetric(List tags, GrpcServerExecutorMetric metric) { + List snapshotTags = new ArrayList<>(); + snapshotTags.add(new ImmutableTag("name", "activeCount")); + snapshotTags.addAll(tags); + NacosMeterRegistryCenter.gauge(METER_REGISTRY, "grpc_server_executor", snapshotTags, metric.getActiveCount()); + + snapshotTags = new ArrayList<>(); + snapshotTags.add(new ImmutableTag("name", "poolSize")); + snapshotTags.addAll(tags); + NacosMeterRegistryCenter.gauge(METER_REGISTRY, "grpc_server_executor", snapshotTags, metric.getPoolSize()); + + snapshotTags = new ArrayList<>(); + snapshotTags.add(new ImmutableTag("name", "corePoolSize")); + snapshotTags.addAll(tags); + NacosMeterRegistryCenter.gauge(METER_REGISTRY, "grpc_server_executor", snapshotTags, metric.getCorePoolSize()); + + snapshotTags = new ArrayList<>(); + snapshotTags.add(new ImmutableTag("name", "maximumPoolSize")); + snapshotTags.addAll(tags); + NacosMeterRegistryCenter.gauge(METER_REGISTRY, "grpc_server_executor", snapshotTags, metric.getMaximumPoolSize()); + + snapshotTags = new ArrayList<>(); + snapshotTags.add(new ImmutableTag("name", "inQueueTaskCount")); + snapshotTags.addAll(tags); + NacosMeterRegistryCenter.gauge(METER_REGISTRY, "grpc_server_executor", snapshotTags, metric.getInQueueTaskCount()); + + snapshotTags = new ArrayList<>(); + snapshotTags.add(new ImmutableTag("name", "taskCount")); + snapshotTags.addAll(tags); + NacosMeterRegistryCenter.gauge(METER_REGISTRY, "grpc_server_executor", snapshotTags, metric.getTaskCount()); + + snapshotTags = new ArrayList<>(); + snapshotTags.add(new ImmutableTag("name", "completedTaskCount")); + snapshotTags.addAll(tags); + NacosMeterRegistryCenter.gauge(METER_REGISTRY, "grpc_server_executor", snapshotTags, metric.getCompletedTaskCount()); } public static AtomicInteger getLongConnectionMonitor() { @@ -106,6 +157,90 @@ public static DistributionSummary getRaftFromLeader() { return RAFT_FROM_LEADER; } + public static GrpcServerExecutorMetric getSdkServerExecutorMetric() { + return sdkServerExecutorMetric; + } + + public static GrpcServerExecutorMetric getClusterServerExecutorMetric() { + return clusterServerExecutorMetric; + } + + public static class GrpcServerExecutorMetric { + + private String type; + + /** + * cout of thread are running job. + */ + private AtomicInteger activeCount = new AtomicInteger(); + + /** + * core thread count. + */ + private AtomicInteger corePoolSize = new AtomicInteger(); + + /** + * current thread count. + */ + private AtomicInteger poolSize = new AtomicInteger(); + + /** + * max thread count. + */ + private AtomicInteger maximumPoolSize = new AtomicInteger(); + + /** + * task count in queue. + */ + private AtomicInteger inQueueTaskCount = new AtomicInteger(); + + /** + * completed task count. + */ + private AtomicLong completedTaskCount = new AtomicLong(); + + /** + * task count. + */ + private AtomicLong taskCount = new AtomicLong(); + + private GrpcServerExecutorMetric(String type) { + this.type = type; + } + + public AtomicInteger getActiveCount() { + return activeCount; + } + + public AtomicInteger getCorePoolSize() { + return corePoolSize; + } + + public AtomicInteger getPoolSize() { + return poolSize; + } + + public AtomicInteger getMaximumPoolSize() { + return maximumPoolSize; + } + + public AtomicInteger getInQueueTaskCount() { + return inQueueTaskCount; + } + + public AtomicLong getCompletedTaskCount() { + return completedTaskCount; + } + + public AtomicLong getTaskCount() { + return taskCount; + } + + public String getType() { + return type; + } + } + /** * refresh all module connection count. * diff --git a/core/src/test/java/com/alibaba/nacos/core/monitor/MetricsMonitorTest.java b/core/src/test/java/com/alibaba/nacos/core/monitor/MetricsMonitorTest.java index 2547284fbae..c957e5136fe 100644 --- a/core/src/test/java/com/alibaba/nacos/core/monitor/MetricsMonitorTest.java +++ b/core/src/test/java/com/alibaba/nacos/core/monitor/MetricsMonitorTest.java @@ -55,6 +55,44 @@ public void initMeterRegistry() { NacosMeterRegistryCenter.getMeterRegistry(NacosMeterRegistryCenter.CORE_STABLE_REGISTRY) .add(new SimpleMeterRegistry()); } + + @Test + public void testSdkServerExecutorMetric() { + MetricsMonitor.getSdkServerExecutorMetric().getPoolSize().set(1); + MetricsMonitor.getSdkServerExecutorMetric().getMaximumPoolSize().set(1); + MetricsMonitor.getSdkServerExecutorMetric().getCorePoolSize().set(1); + MetricsMonitor.getSdkServerExecutorMetric().getActiveCount().set(1); + MetricsMonitor.getSdkServerExecutorMetric().getInQueueTaskCount().set(1); + MetricsMonitor.getSdkServerExecutorMetric().getTaskCount().set(1); + MetricsMonitor.getSdkServerExecutorMetric().getCompletedTaskCount().set(1); + Assert.assertEquals(MetricsMonitor.getSdkServerExecutorMetric().getType(), "grpcSdkServer"); + Assert.assertEquals(MetricsMonitor.getSdkServerExecutorMetric().getPoolSize().get(), 1); + Assert.assertEquals(MetricsMonitor.getSdkServerExecutorMetric().getMaximumPoolSize().get(), 1); + Assert.assertEquals(MetricsMonitor.getSdkServerExecutorMetric().getCorePoolSize().get(), 1); + Assert.assertEquals(MetricsMonitor.getSdkServerExecutorMetric().getActiveCount().get(), 1); + Assert.assertEquals(MetricsMonitor.getSdkServerExecutorMetric().getInQueueTaskCount().get(), 1); + Assert.assertEquals(MetricsMonitor.getSdkServerExecutorMetric().getTaskCount().get(), 1); + Assert.assertEquals(MetricsMonitor.getSdkServerExecutorMetric().getCompletedTaskCount().get(), 1); + } + + @Test + public void testClusterServerExecutorMetric() { + MetricsMonitor.getClusterServerExecutorMetric().getPoolSize().set(1); + MetricsMonitor.getClusterServerExecutorMetric().getMaximumPoolSize().set(1); + MetricsMonitor.getClusterServerExecutorMetric().getCorePoolSize().set(1); + MetricsMonitor.getClusterServerExecutorMetric().getActiveCount().set(1); + MetricsMonitor.getClusterServerExecutorMetric().getInQueueTaskCount().set(1); + MetricsMonitor.getClusterServerExecutorMetric().getTaskCount().set(1); + MetricsMonitor.getClusterServerExecutorMetric().getCompletedTaskCount().set(1); + Assert.assertEquals(MetricsMonitor.getClusterServerExecutorMetric().getType(), "grpcClusterServer"); + Assert.assertEquals(MetricsMonitor.getClusterServerExecutorMetric().getPoolSize().get(), 1); + Assert.assertEquals(MetricsMonitor.getClusterServerExecutorMetric().getMaximumPoolSize().get(), 1); + Assert.assertEquals(MetricsMonitor.getClusterServerExecutorMetric().getCorePoolSize().get(), 1); + Assert.assertEquals(MetricsMonitor.getClusterServerExecutorMetric().getActiveCount().get(), 1); + Assert.assertEquals(MetricsMonitor.getClusterServerExecutorMetric().getInQueueTaskCount().get(), 1); + Assert.assertEquals(MetricsMonitor.getClusterServerExecutorMetric().getTaskCount().get(), 1); + Assert.assertEquals(MetricsMonitor.getClusterServerExecutorMetric().getCompletedTaskCount().get(), 1); + } @Test public void testGetLongConnectionMonitor() {