-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add monitorType JMX #472
base: main
Are you sure you want to change the base?
Add monitorType JMX #472
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,170 @@ | ||||||
/* | ||||||
alaturqua marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
* you may not use this file except in compliance with the License. | ||||||
* You may obtain a copy of the License at | ||||||
* | ||||||
* http://www.apache.org/licenses/LICENSE-2.0 | ||||||
* | ||||||
* Unless required by applicable law or agreed to in writing, software | ||||||
* distributed under the License is distributed on an "AS IS" BASIS, | ||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
* See the License for the specific language governing permissions and | ||||||
* limitations under the License. | ||||||
*/ | ||||||
package io.trino.gateway.ha.clustermonitor; | ||||||
|
||||||
import com.fasterxml.jackson.databind.JsonNode; | ||||||
import io.airlift.http.client.BasicAuthRequestFilter; | ||||||
import io.airlift.http.client.HttpClient; | ||||||
import io.airlift.http.client.HttpRequestFilter; | ||||||
import io.airlift.http.client.JsonResponseHandler; | ||||||
import io.airlift.http.client.Request; | ||||||
import io.airlift.http.client.UnexpectedResponseException; | ||||||
import io.airlift.log.Logger; | ||||||
import io.trino.gateway.ha.config.BackendStateConfiguration; | ||||||
import io.trino.gateway.ha.config.ProxyBackendConfiguration; | ||||||
|
||||||
import java.net.URI; | ||||||
import java.util.Optional; | ||||||
|
||||||
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom; | ||||||
import static io.airlift.http.client.JsonResponseHandler.createJsonResponseHandler; | ||||||
import static io.airlift.http.client.Request.Builder.prepareGet; | ||||||
import static io.airlift.json.JsonCodec.jsonCodec; | ||||||
import static java.util.Objects.requireNonNull; | ||||||
|
||||||
public class ClusterStatsJmxMonitor | ||||||
implements ClusterStatsMonitor | ||||||
{ | ||||||
private static final Logger log = Logger.get(ClusterStatsJmxMonitor.class); | ||||||
private static final JsonResponseHandler<JsonNode> JMX_JSON_RESPONSE_HANDLER = | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. create a pair of |
||||||
createJsonResponseHandler(jsonCodec(JsonNode.class)); | ||||||
private static final String JMX_PATH = "/v1/jmx/mbean"; | ||||||
|
||||||
private final String username; | ||||||
private final String password; | ||||||
private final HttpClient client; | ||||||
|
||||||
public ClusterStatsJmxMonitor(HttpClient client, BackendStateConfiguration backendStateConfiguration) | ||||||
{ | ||||||
this.client = requireNonNull(client, "client is null"); | ||||||
this.username = backendStateConfiguration.getUsername(); | ||||||
this.password = backendStateConfiguration.getPassword(); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public ClusterStats monitor(ProxyBackendConfiguration backend) | ||||||
{ | ||||||
log.info("Monitoring cluster stats for backend: %s", backend.getProxyTo()); | ||||||
ClusterStats.Builder clusterStats = ClusterStatsMonitor.getClusterStatsBuilder(backend); | ||||||
|
||||||
processJmxStats(backend, "trino.metadata:name=DiscoveryNodeManager", this::processDiscoveryNodeManagerStats, clusterStats); | ||||||
processJmxStats(backend, "trino.execution:name=QueryManager", this::processQueryManagerStats, clusterStats); | ||||||
|
||||||
clusterStats.proxyTo(backend.getProxyTo()) | ||||||
.externalUrl(backend.getExternalUrl()) | ||||||
.routingGroup(backend.getRoutingGroup()); | ||||||
|
||||||
ClusterStats stats = clusterStats.build(); | ||||||
log.debug("Completed monitoring for backend: %s. Stats: %s", backend.getProxyTo(), stats); | ||||||
return stats; | ||||||
} | ||||||
|
||||||
private void processJmxStats(ProxyBackendConfiguration backend, String mbeanName, | ||||||
JmxStatProcessor processor, ClusterStats.Builder clusterStats) | ||||||
{ | ||||||
queryJmx(backend, mbeanName) | ||||||
.ifPresent(response -> processor.process(response, clusterStats)); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
|
||||||
private void processDiscoveryNodeManagerStats(JsonNode response, ClusterStats.Builder clusterStats) | ||||||
{ | ||||||
try { | ||||||
JsonNode attributes = response.get("attributes"); | ||||||
for (JsonNode attribute : attributes) { | ||||||
if ("ActiveNodeCount".equals(attribute.get("name").asText())) { | ||||||
int activeNodes = attribute.get("value").asInt(); | ||||||
TrinoStatus trinoStatus = activeNodes > 0 ? TrinoStatus.HEALTHY : TrinoStatus.UNHEALTHY; | ||||||
clusterStats.numWorkerNodes(activeNodes) | ||||||
.trinoStatus(trinoStatus); | ||||||
log.debug("Processed DiscoveryNodeManager: ActiveNodeCount = %d, Health = %s", | ||||||
activeNodes, trinoStatus); | ||||||
break; | ||||||
} | ||||||
} | ||||||
} | ||||||
catch (Exception e) { | ||||||
log.error(e, "Error parsing DiscoveryNodeManager stats"); | ||||||
} | ||||||
} | ||||||
|
||||||
private void processQueryManagerStats(JsonNode response, ClusterStats.Builder clusterStats) | ||||||
{ | ||||||
try { | ||||||
JsonNode attributes = response.get("attributes"); | ||||||
int queuedQueries = 0; | ||||||
int runningQueries = 0; | ||||||
for (JsonNode attribute : attributes) { | ||||||
String name = attribute.get("name").asText(); | ||||||
if ("QueuedQueries".equals(name)) { | ||||||
queuedQueries = attribute.get("value").asInt(); | ||||||
} | ||||||
else if ("RunningQueries".equals(name)) { | ||||||
runningQueries = attribute.get("value").asInt(); | ||||||
} | ||||||
} | ||||||
clusterStats.queuedQueryCount(queuedQueries).runningQueryCount(runningQueries); | ||||||
log.debug("Processed QueryManager: QueuedQueries = %d, RunningQueries = %d", queuedQueries, runningQueries); | ||||||
} | ||||||
catch (Exception e) { | ||||||
log.error(e, "Error parsing QueryManager stats"); | ||||||
} | ||||||
} | ||||||
|
||||||
private Optional<JsonNode> queryJmx(ProxyBackendConfiguration backend, String mbeanName) | ||||||
{ | ||||||
requireNonNull(backend, "backend is null"); | ||||||
requireNonNull(mbeanName, "mbeanName is null"); | ||||||
|
||||||
String jmxUrl = backend.getProxyTo(); | ||||||
Request request; | ||||||
|
||||||
Request preparedRequest = prepareGet() | ||||||
.setUri(uriBuilderFrom(URI.create(jmxUrl)) | ||||||
.appendPath(JMX_PATH) | ||||||
.appendPath(mbeanName) | ||||||
.build() | ||||||
).addHeader("X-Trino-User", username) | ||||||
.build(); | ||||||
|
||||||
boolean isHttps = preparedRequest.getUri().getScheme().equalsIgnoreCase("https"); | ||||||
|
||||||
if (isHttps) { | ||||||
HttpRequestFilter filter = new BasicAuthRequestFilter(username, password); | ||||||
request = filter.filterRequest(preparedRequest); | ||||||
} | ||||||
else { | ||||||
request = preparedRequest; | ||||||
} | ||||||
Comment on lines
+132
to
+148
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a method to create the basic auth header. I'm not sure if this is the intended use case for the HttpRequestFilter classes. Then you can just
and use |
||||||
|
||||||
log.debug("Querying JMX at %s for %s", request.getUri(), mbeanName); | ||||||
|
||||||
try { | ||||||
return Optional.ofNullable(client.execute(request, JMX_JSON_RESPONSE_HANDLER)); | ||||||
} | ||||||
catch (UnexpectedResponseException e) { | ||||||
log.error(e, "Failed to fetch JMX data for %s, response code: %d", mbeanName, e.getStatusCode()); | ||||||
return Optional.empty(); | ||||||
} | ||||||
catch (Exception e) { | ||||||
log.error(e, "Exception while querying JMX at %s", jmxUrl); | ||||||
return Optional.empty(); | ||||||
} | ||||||
} | ||||||
|
||||||
@FunctionalInterface | ||||||
private interface JmxStatProcessor | ||||||
{ | ||||||
void process(JsonNode response, ClusterStats.Builder clusterStats); | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,7 +39,8 @@ final class TestClusterStatsMonitor | |
void setUp() | ||
{ | ||
trino = new TrinoContainer("trinodb/trino"); | ||
trino.withCopyFileToContainer(forClasspathResource("trino-config.properties"), "/etc/trino/config.properties"); | ||
trino.withCopyFileToContainer(forClasspathResource("trino-config-with-rmi.properties"), "/etc/trino/config.properties"); | ||
trino.withCopyFileToContainer(forClasspathResource("jvm-with-rmi.config"), "/etc/trino/jvm.config"); | ||
trino.start(); | ||
} | ||
|
||
|
@@ -61,6 +62,12 @@ void testJdbcMonitor() | |
testClusterStatsMonitor(backendStateConfiguration -> new ClusterStatsJdbcMonitor(backendStateConfiguration, new MonitorConfiguration())); | ||
} | ||
|
||
@Test | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add tests for the failure cases, 400 and 500 response codes with non json and invalid json response bodies |
||
void testJmxMonitor() | ||
{ | ||
testClusterStatsMonitor(backendStateConfiguration -> new ClusterStatsJmxMonitor(new JettyHttpClient(new HttpClientConfig()), backendStateConfiguration)); | ||
} | ||
|
||
@Test | ||
void testInfoApiMonitor() | ||
{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
-server | ||
-XX:InitialRAMPercentage=80 | ||
-XX:MaxRAMPercentage=80 | ||
-XX:G1HeapRegionSize=32M | ||
-XX:+ExplicitGCInvokesConcurrent | ||
-XX:+ExitOnOutOfMemoryError | ||
-XX:+HeapDumpOnOutOfMemoryError | ||
-XX:-OmitStackTraceInFastThrow | ||
-XX:ReservedCodeCacheSize=512M | ||
-XX:PerMethodRecompilationCutoff=10000 | ||
-XX:PerBytecodeRecompilationCutoff=10000 | ||
-Djdk.attach.allowAttachSelf=true | ||
-Djdk.nio.maxCachedBufferSize=2000000 | ||
-Dfile.encoding=UTF-8 | ||
# Allow loading dynamic agent used by JOL | ||
-XX:+EnableDynamicAgentLoading | ||
-Dcom.sun.management.jmxremote.rmi.port=9081 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
# COPY from https://github.com/trinodb/trino/blob/master/core/docker/default/etc/config.properties | ||
coordinator=true | ||
node-scheduler.include-coordinator=true | ||
http-server.http.port=8080 | ||
discovery.uri=http://localhost:8080 | ||
catalog.management=${ENV:CATALOG_MANAGEMENT} | ||
|
||
# Customize | ||
http-server.process-forwarded=true | ||
jmx.rmiregistry.port=9080 | ||
jmx.rmiserver.port=9081 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.