From 847a9454c4fe132d0f01741cf39d9878540b8d27 Mon Sep 17 00:00:00 2001 From: Isa Inalcik Date: Wed, 20 Nov 2024 19:10:23 +0100 Subject: [PATCH] add monitorType JMX --- .../ClusterStatsJmxMonitor.java | 170 ++++++++++++++++++ .../ha/config/ClusterStatsMonitorType.java | 3 +- .../ha/module/ClusterStatsMonitorModule.java | 2 + .../TestClusterStatsMonitor.java | 9 +- .../src/test/resources/jvm-with-rmi.config | 17 ++ .../trino-config-with-rmi.properties | 11 ++ 6 files changed, 210 insertions(+), 2 deletions(-) create mode 100644 gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsJmxMonitor.java create mode 100644 gateway-ha/src/test/resources/jvm-with-rmi.config create mode 100644 gateway-ha/src/test/resources/trino-config-with-rmi.properties diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsJmxMonitor.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsJmxMonitor.java new file mode 100644 index 000000000..f9af2c4f8 --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsJmxMonitor.java @@ -0,0 +1,170 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.clustermonitor; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airlift.http.client.BasicAuthRequestFilter; +import io.airlift.http.client.HttpClient; +import io.airlift.http.client.HttpRequestFilter; +import io.airlift.http.client.JsonResponseHandler; +import io.airlift.http.client.Request; +import io.airlift.http.client.UnexpectedResponseException; +import io.airlift.log.Logger; +import io.trino.gateway.ha.config.BackendStateConfiguration; +import io.trino.gateway.ha.config.ProxyBackendConfiguration; + +import java.net.URI; +import java.util.Optional; + +import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom; +import static io.airlift.http.client.JsonResponseHandler.createJsonResponseHandler; +import static io.airlift.http.client.Request.Builder.prepareGet; +import static io.airlift.json.JsonCodec.jsonCodec; +import static java.util.Objects.requireNonNull; + +public class ClusterStatsJmxMonitor + implements ClusterStatsMonitor +{ + private static final Logger log = Logger.get(ClusterStatsJmxMonitor.class); + private static final JsonResponseHandler JMX_JSON_RESPONSE_HANDLER = + createJsonResponseHandler(jsonCodec(JsonNode.class)); + private static final String JMX_PATH = "/v1/jmx/mbean"; + + private final String username; + private final String password; + private final HttpClient client; + + public ClusterStatsJmxMonitor(HttpClient client, BackendStateConfiguration backendStateConfiguration) + { + this.client = requireNonNull(client, "client is null"); + this.username = backendStateConfiguration.getUsername(); + this.password = backendStateConfiguration.getPassword(); + } + + @Override + public ClusterStats monitor(ProxyBackendConfiguration backend) + { + log.info("Monitoring cluster stats for backend: %s", backend.getProxyTo()); + ClusterStats.Builder clusterStats = ClusterStatsMonitor.getClusterStatsBuilder(backend); + + processJmxStats(backend, "trino.metadata:name=DiscoveryNodeManager", this::processDiscoveryNodeManagerStats, clusterStats); + processJmxStats(backend, "trino.execution:name=QueryManager", this::processQueryManagerStats, clusterStats); + + clusterStats.proxyTo(backend.getProxyTo()) + .externalUrl(backend.getExternalUrl()) + .routingGroup(backend.getRoutingGroup()); + + ClusterStats stats = clusterStats.build(); + log.debug("Completed monitoring for backend: %s. Stats: %s", backend.getProxyTo(), stats); + return stats; + } + + private void processJmxStats(ProxyBackendConfiguration backend, String mbeanName, + JmxStatProcessor processor, ClusterStats.Builder clusterStats) + { + queryJmx(backend, mbeanName) + .ifPresent(response -> processor.process(response, clusterStats)); + } + + private void processDiscoveryNodeManagerStats(JsonNode response, ClusterStats.Builder clusterStats) + { + try { + JsonNode attributes = response.get("attributes"); + for (JsonNode attribute : attributes) { + if ("ActiveNodeCount".equals(attribute.get("name").asText())) { + int activeNodes = attribute.get("value").asInt(); + TrinoStatus trinoStatus = activeNodes > 0 ? TrinoStatus.HEALTHY : TrinoStatus.UNHEALTHY; + clusterStats.numWorkerNodes(activeNodes) + .trinoStatus(trinoStatus); + log.debug("Processed DiscoveryNodeManager: ActiveNodeCount = %d, Health = %s", + activeNodes, trinoStatus); + break; + } + } + } + catch (Exception e) { + log.error(e, "Error parsing DiscoveryNodeManager stats"); + } + } + + private void processQueryManagerStats(JsonNode response, ClusterStats.Builder clusterStats) + { + try { + JsonNode attributes = response.get("attributes"); + int queuedQueries = 0; + int runningQueries = 0; + for (JsonNode attribute : attributes) { + String name = attribute.get("name").asText(); + if ("QueuedQueries".equals(name)) { + queuedQueries = attribute.get("value").asInt(); + } + else if ("RunningQueries".equals(name)) { + runningQueries = attribute.get("value").asInt(); + } + } + clusterStats.queuedQueryCount(queuedQueries).runningQueryCount(runningQueries); + log.debug("Processed QueryManager: QueuedQueries = %d, RunningQueries = %d", queuedQueries, runningQueries); + } + catch (Exception e) { + log.error(e, "Error parsing QueryManager stats"); + } + } + + private Optional queryJmx(ProxyBackendConfiguration backend, String mbeanName) + { + requireNonNull(backend, "backend is null"); + requireNonNull(mbeanName, "mbeanName is null"); + + String jmxUrl = backend.getProxyTo(); + Request request; + + Request preparedRequest = prepareGet() + .setUri(uriBuilderFrom(URI.create(jmxUrl)) + .appendPath(JMX_PATH) + .appendPath(mbeanName) + .build() + ).addHeader("X-Trino-User", username) + .build(); + + boolean isHttps = preparedRequest.getUri().getScheme().equalsIgnoreCase("https"); + + if (isHttps) { + HttpRequestFilter filter = new BasicAuthRequestFilter(username, password); + request = filter.filterRequest(preparedRequest); + } + else { + request = preparedRequest; + } + + log.debug("Querying JMX at %s for %s", request.getUri(), mbeanName); + + try { + return Optional.ofNullable(client.execute(request, JMX_JSON_RESPONSE_HANDLER)); + } + catch (UnexpectedResponseException e) { + log.error(e, "Failed to fetch JMX data for %s, response code: %d", mbeanName, e.getStatusCode()); + return Optional.empty(); + } + catch (Exception e) { + log.error(e, "Exception while querying JMX at %s", jmxUrl); + return Optional.empty(); + } + } + + @FunctionalInterface + private interface JmxStatProcessor + { + void process(JsonNode response, ClusterStats.Builder clusterStats); + } +} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterStatsMonitorType.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterStatsMonitorType.java index ad7835bd2..ae44536af 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterStatsMonitorType.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterStatsMonitorType.java @@ -18,5 +18,6 @@ public enum ClusterStatsMonitorType NOOP, INFO_API, UI_API, - JDBC + JDBC, + JMX } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterStatsMonitorModule.java b/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterStatsMonitorModule.java index b92ce01e9..cb1b3a1df 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterStatsMonitorModule.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterStatsMonitorModule.java @@ -20,6 +20,7 @@ import io.trino.gateway.ha.clustermonitor.ClusterStatsHttpMonitor; import io.trino.gateway.ha.clustermonitor.ClusterStatsInfoApiMonitor; import io.trino.gateway.ha.clustermonitor.ClusterStatsJdbcMonitor; +import io.trino.gateway.ha.clustermonitor.ClusterStatsJmxMonitor; import io.trino.gateway.ha.clustermonitor.ClusterStatsMonitor; import io.trino.gateway.ha.clustermonitor.ForMonitor; import io.trino.gateway.ha.clustermonitor.NoopClusterStatsMonitor; @@ -50,6 +51,7 @@ public ClusterStatsMonitor getClusterStatsMonitor(@ForMonitor HttpClient httpCli case INFO_API -> new ClusterStatsInfoApiMonitor(httpClient, config.getMonitor()); case UI_API -> new ClusterStatsHttpMonitor(config.getBackendState()); case JDBC -> new ClusterStatsJdbcMonitor(config.getBackendState(), config.getMonitor()); + case JMX -> new ClusterStatsJmxMonitor(httpClient, config.getBackendState()); case NOOP -> new NoopClusterStatsMonitor(); }; } diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/clustermonitor/TestClusterStatsMonitor.java b/gateway-ha/src/test/java/io/trino/gateway/ha/clustermonitor/TestClusterStatsMonitor.java index 3a9ab15be..f9e3f3d6c 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/clustermonitor/TestClusterStatsMonitor.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/clustermonitor/TestClusterStatsMonitor.java @@ -39,7 +39,8 @@ final class TestClusterStatsMonitor void setUp() { trino = new TrinoContainer("trinodb/trino"); - trino.withCopyFileToContainer(forClasspathResource("trino-config.properties"), "/etc/trino/config.properties"); + trino.withCopyFileToContainer(forClasspathResource("trino-config-with-rmi.properties"), "/etc/trino/config.properties"); + trino.withCopyFileToContainer(forClasspathResource("jvm-with-rmi.config"), "/etc/trino/jvm.config"); trino.start(); } @@ -61,6 +62,12 @@ void testJdbcMonitor() testClusterStatsMonitor(backendStateConfiguration -> new ClusterStatsJdbcMonitor(backendStateConfiguration, new MonitorConfiguration())); } + @Test + void testJmxMonitor() + { + testClusterStatsMonitor(backendStateConfiguration -> new ClusterStatsJmxMonitor(new JettyHttpClient(new HttpClientConfig()), backendStateConfiguration)); + } + @Test void testInfoApiMonitor() { diff --git a/gateway-ha/src/test/resources/jvm-with-rmi.config b/gateway-ha/src/test/resources/jvm-with-rmi.config new file mode 100644 index 000000000..1500ea0d0 --- /dev/null +++ b/gateway-ha/src/test/resources/jvm-with-rmi.config @@ -0,0 +1,17 @@ +-server +-XX:InitialRAMPercentage=80 +-XX:MaxRAMPercentage=80 +-XX:G1HeapRegionSize=32M +-XX:+ExplicitGCInvokesConcurrent +-XX:+ExitOnOutOfMemoryError +-XX:+HeapDumpOnOutOfMemoryError +-XX:-OmitStackTraceInFastThrow +-XX:ReservedCodeCacheSize=512M +-XX:PerMethodRecompilationCutoff=10000 +-XX:PerBytecodeRecompilationCutoff=10000 +-Djdk.attach.allowAttachSelf=true +-Djdk.nio.maxCachedBufferSize=2000000 +-Dfile.encoding=UTF-8 +# Allow loading dynamic agent used by JOL +-XX:+EnableDynamicAgentLoading +-Dcom.sun.management.jmxremote.rmi.port=9081 diff --git a/gateway-ha/src/test/resources/trino-config-with-rmi.properties b/gateway-ha/src/test/resources/trino-config-with-rmi.properties new file mode 100644 index 000000000..064b705c5 --- /dev/null +++ b/gateway-ha/src/test/resources/trino-config-with-rmi.properties @@ -0,0 +1,11 @@ +# COPY from https://github.com/trinodb/trino/blob/master/core/docker/default/etc/config.properties +coordinator=true +node-scheduler.include-coordinator=true +http-server.http.port=8080 +discovery.uri=http://localhost:8080 +catalog.management=${ENV:CATALOG_MANAGEMENT} + +# Customize +http-server.process-forwarded=true +jmx.rmiregistry.port=9080 +jmx.rmiserver.port=9081