Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add monitorType JMX #472

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,20 @@ monitor:

Other timeout parameters are not applicable to the JDBC connection.

#### JMX

The monitor type `JMX` can be used as an alternative to collect cluster information, which is required for the query count based routing strategy.
This is using `v1/jmx/mbean` endpoint on trino clusters.
To enable this, [JMX monitoring](https://trino.io/docs/current/admin/jmx.html) must be activated on all Trino clusters.

Ensure that a username and password are configured by adding the `backendState` section to your configuration. The credentials must be consistent across all backend clusters and have `read` rights on the `system_information`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and have read rights on the system_information.
Is there anywhere in the Trino documentation we could link to that gives an example of how to set this up? Or we could provide a snippet of FBAC configuration


```yaml
backendState:
username: "user"
password: "password"
```

#### UI_API

This pulls cluster information from the `ui/api/stats` REST endpoint. This is
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
alaturqua marked this conversation as resolved.
Show resolved Hide resolved
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.clustermonitor;

import com.fasterxml.jackson.databind.JsonNode;
import io.airlift.http.client.BasicAuthRequestFilter;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpRequestFilter;
import io.airlift.http.client.JsonResponseHandler;
import io.airlift.http.client.Request;
import io.airlift.http.client.UnexpectedResponseException;
import io.airlift.log.Logger;
import io.trino.gateway.ha.config.BackendStateConfiguration;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;

import java.net.URI;
import java.util.Optional;

import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static io.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
import static io.airlift.http.client.Request.Builder.prepareGet;
import static io.airlift.json.JsonCodec.jsonCodec;
import static java.util.Objects.requireNonNull;

public class ClusterStatsJmxMonitor
implements ClusterStatsMonitor
{
private static final Logger log = Logger.get(ClusterStatsJmxMonitor.class);
private static final JsonResponseHandler<JsonNode> JMX_JSON_RESPONSE_HANDLER =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create a pair of record classes for the JMX response and the attributes items, and use this instead of the generic JsonNode here and in processDiscoveryNodeManagerStats and processQueryManagerStats

createJsonResponseHandler(jsonCodec(JsonNode.class));
private static final String JMX_PATH = "/v1/jmx/mbean";

private final String username;
private final String password;
private final HttpClient client;

public ClusterStatsJmxMonitor(HttpClient client, BackendStateConfiguration backendStateConfiguration)
{
this.client = requireNonNull(client, "client is null");
this.username = backendStateConfiguration.getUsername();
this.password = backendStateConfiguration.getPassword();
}

@Override
public ClusterStats monitor(ProxyBackendConfiguration backend)
{
log.info("Monitoring cluster stats for backend: %s", backend.getProxyTo());
ClusterStats.Builder clusterStats = ClusterStatsMonitor.getClusterStatsBuilder(backend);

processJmxStats(backend, "trino.metadata:name=DiscoveryNodeManager", this::processDiscoveryNodeManagerStats, clusterStats);
processJmxStats(backend, "trino.execution:name=QueryManager", this::processQueryManagerStats, clusterStats);

clusterStats.proxyTo(backend.getProxyTo())
.externalUrl(backend.getExternalUrl())
.routingGroup(backend.getRoutingGroup());

ClusterStats stats = clusterStats.build();
log.debug("Completed monitoring for backend: %s. Stats: %s", backend.getProxyTo(), stats);
return stats;
}

private void processJmxStats(ProxyBackendConfiguration backend, String mbeanName,
JmxStatProcessor processor, ClusterStats.Builder clusterStats)
{
queryJmx(backend, mbeanName)
.ifPresent(response -> processor.process(response, clusterStats));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queryJmx may return Optional.empty() in the case of an error response. In this case ifPresent will not trigger, and the backend stats will not be updated, so the backend will still appear healthy. You can avoid this by doing something like

Suggested change
.ifPresent(response -> processor.process(response, clusterStats));
.ifPresentOrElse((response -> processor.process(response, clusterStats), () -> clusterStats.trinoStatus(TrinoStatus.UNHEALTHY));

}

private void processDiscoveryNodeManagerStats(JsonNode response, ClusterStats.Builder clusterStats)
{
try {
JsonNode attributes = response.get("attributes");
for (JsonNode attribute : attributes) {
if ("ActiveNodeCount".equals(attribute.get("name").asText())) {
int activeNodes = attribute.get("value").asInt();
TrinoStatus trinoStatus = activeNodes > 0 ? TrinoStatus.HEALTHY : TrinoStatus.UNHEALTHY;
clusterStats.numWorkerNodes(activeNodes)
.trinoStatus(trinoStatus);
log.debug("Processed DiscoveryNodeManager: ActiveNodeCount = %d, Health = %s",
activeNodes, trinoStatus);
break;
}
}
}
catch (Exception e) {
log.error(e, "Error parsing DiscoveryNodeManager stats");
}
}

private void processQueryManagerStats(JsonNode response, ClusterStats.Builder clusterStats)
{
try {
JsonNode attributes = response.get("attributes");
int queuedQueries = 0;
int runningQueries = 0;
for (JsonNode attribute : attributes) {
String name = attribute.get("name").asText();
if ("QueuedQueries".equals(name)) {
queuedQueries = attribute.get("value").asInt();
}
else if ("RunningQueries".equals(name)) {
runningQueries = attribute.get("value").asInt();
}
}
clusterStats.queuedQueryCount(queuedQueries).runningQueryCount(runningQueries);
log.debug("Processed QueryManager: QueuedQueries = %d, RunningQueries = %d", queuedQueries, runningQueries);
}
catch (Exception e) {
log.error(e, "Error parsing QueryManager stats");
}
}

private Optional<JsonNode> queryJmx(ProxyBackendConfiguration backend, String mbeanName)
{
requireNonNull(backend, "backend is null");
requireNonNull(mbeanName, "mbeanName is null");

String jmxUrl = backend.getProxyTo();
Request request;

Request preparedRequest = prepareGet()
.setUri(uriBuilderFrom(URI.create(jmxUrl))
.appendPath(JMX_PATH)
.appendPath(mbeanName)
.build()
).addHeader("X-Trino-User", username)
.build();

boolean isHttps = preparedRequest.getUri().getScheme().equalsIgnoreCase("https");

if (isHttps) {
HttpRequestFilter filter = new BasicAuthRequestFilter(username, password);
request = filter.filterRequest(preparedRequest);
}
else {
request = preparedRequest;
}
Comment on lines +132 to +148
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a method to create the basic auth header. I'm not sure if this is the intended use case for the HttpRequestFilter classes. Then you can just

        Request preparedRequest = prepareGet()
                .setUri(uriBuilderFrom(URI.create(jmxUrl))
                        .appendPath(JMX_PATH)
                        .appendPath(mbeanName)
                        .build())
               .addHeader("X-Trino-User", username);
        if (isHttps) {
           preparedRequest.addHeader(createBasicAuthHeader(username, password));
        }
        preparedRequest.build();

and use preparedRequest directly.


log.debug("Querying JMX at %s for %s", request.getUri(), mbeanName);

try {
return Optional.ofNullable(client.execute(request, JMX_JSON_RESPONSE_HANDLER));
}
catch (UnexpectedResponseException e) {
log.error(e, "Failed to fetch JMX data for %s, response code: %d", mbeanName, e.getStatusCode());
return Optional.empty();
}
catch (Exception e) {
log.error(e, "Exception while querying JMX at %s", jmxUrl);
return Optional.empty();
}
}

@FunctionalInterface
private interface JmxStatProcessor
{
void process(JsonNode response, ClusterStats.Builder clusterStats);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ public enum ClusterStatsMonitorType
NOOP,
INFO_API,
UI_API,
JDBC
JDBC,
JMX
alaturqua marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.gateway.ha.clustermonitor.ClusterStatsHttpMonitor;
import io.trino.gateway.ha.clustermonitor.ClusterStatsInfoApiMonitor;
import io.trino.gateway.ha.clustermonitor.ClusterStatsJdbcMonitor;
import io.trino.gateway.ha.clustermonitor.ClusterStatsJmxMonitor;
import io.trino.gateway.ha.clustermonitor.ClusterStatsMonitor;
import io.trino.gateway.ha.clustermonitor.ForMonitor;
import io.trino.gateway.ha.clustermonitor.NoopClusterStatsMonitor;
Expand Down Expand Up @@ -50,6 +51,7 @@ public ClusterStatsMonitor getClusterStatsMonitor(@ForMonitor HttpClient httpCli
case INFO_API -> new ClusterStatsInfoApiMonitor(httpClient, config.getMonitor());
case UI_API -> new ClusterStatsHttpMonitor(config.getBackendState());
case JDBC -> new ClusterStatsJdbcMonitor(config.getBackendState(), config.getMonitor());
case JMX -> new ClusterStatsJmxMonitor(httpClient, config.getBackendState());
case NOOP -> new NoopClusterStatsMonitor();
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ final class TestClusterStatsMonitor
void setUp()
{
trino = new TrinoContainer("trinodb/trino");
trino.withCopyFileToContainer(forClasspathResource("trino-config.properties"), "/etc/trino/config.properties");
trino.withCopyFileToContainer(forClasspathResource("trino-config-with-rmi.properties"), "/etc/trino/config.properties");
trino.withCopyFileToContainer(forClasspathResource("jvm-with-rmi.config"), "/etc/trino/jvm.config");
trino.start();
}

Expand All @@ -61,6 +62,12 @@ void testJdbcMonitor()
testClusterStatsMonitor(backendStateConfiguration -> new ClusterStatsJdbcMonitor(backendStateConfiguration, new MonitorConfiguration()));
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add tests for the failure cases, 400 and 500 response codes with non json and invalid json response bodies

void testJmxMonitor()
{
testClusterStatsMonitor(backendStateConfiguration -> new ClusterStatsJmxMonitor(new JettyHttpClient(new HttpClientConfig()), backendStateConfiguration));
}

@Test
void testInfoApiMonitor()
{
Expand Down
17 changes: 17 additions & 0 deletions gateway-ha/src/test/resources/jvm-with-rmi.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-server
-XX:InitialRAMPercentage=80
-XX:MaxRAMPercentage=80
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:-OmitStackTraceInFastThrow
-XX:ReservedCodeCacheSize=512M
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
-Djdk.attach.allowAttachSelf=true
-Djdk.nio.maxCachedBufferSize=2000000
-Dfile.encoding=UTF-8
# Allow loading dynamic agent used by JOL
-XX:+EnableDynamicAgentLoading
-Dcom.sun.management.jmxremote.rmi.port=9081
11 changes: 11 additions & 0 deletions gateway-ha/src/test/resources/trino-config-with-rmi.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# COPY from https://github.com/trinodb/trino/blob/master/core/docker/default/etc/config.properties
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
discovery.uri=http://localhost:8080
catalog.management=${ENV:CATALOG_MANAGEMENT}

# Customize
http-server.process-forwarded=true
jmx.rmiregistry.port=9080
jmx.rmiserver.port=9081
Loading