From e023c1a60de8d1f0489b797adf1b25438002a820 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Wed, 30 Aug 2023 18:36:13 +0300 Subject: [PATCH 01/10] Add compaction stats, add metrics to the JobExecutor (but not exposed yet) and modify some methods to be more reusable --- .../datastax/mgmtapi/util/JobExecutor.java | 11 +- .../builder/CassandraMetricNameParser.java | 7 + .../CassandraMetricRegistryListener.java | 6 +- .../interceptors/MetricsInterceptor.java | 11 +- .../prometheus/CassandraTasksExports.java | 135 ++++++++++++++++++ 5 files changed, 158 insertions(+), 12 deletions(-) create mode 100644 management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java diff --git a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java index f51e4f9e..2e086a15 100644 --- a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java +++ b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java @@ -10,11 +10,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.cassandra.utils.Pair; public class JobExecutor { ExecutorService executorService = Executors.newFixedThreadPool(1); - Cache jobCache = CacheBuilder.newBuilder().maximumSize(1000).build(); + Cache jobCache = CacheBuilder.newBuilder().recordStats().maximumSize(1000).build(); public Pair> submit(String jobType, Runnable runnable) { // Where do I create the job details? Here? Add it to the Cache first? @@ -45,4 +46,12 @@ public Pair> submit(String jobType, Runnable run public Job getJobWithId(String jobId) { return jobCache.getIfPresent(jobId); } + + public int runningTasks() { + return ((ThreadPoolExecutor) executorService).getActiveCount(); + } + + public int queuedTasks() { + return ((ThreadPoolExecutor) executorService).getQueue().size(); + } } diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricNameParser.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricNameParser.java index 5bdd3d21..fa289dd0 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricNameParser.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricNameParser.java @@ -42,6 +42,13 @@ public CassandraMetricNameParser( } } + public static CassandraMetricNameParser getDefaultParser(Configuration config) { + return new CassandraMetricNameParser( + CassandraMetricsTools.DEFAULT_LABEL_NAMES, + CassandraMetricsTools.DEFAULT_LABEL_VALUES, + config); + } + private void parseEnvVariablesAsLabels(Map envSettings) { for (Map.Entry entry : envSettings.entrySet()) { String envValue = System.getenv(entry.getValue()); diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java index 7e8b515e..30c0eb00 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java @@ -77,11 +77,7 @@ public class CassandraMetricRegistryListener implements MetricRegistryListener { public CassandraMetricRegistryListener( ConcurrentHashMap familyCache, Configuration config) throws NoSuchMethodException { - parser = - new CassandraMetricNameParser( - CassandraMetricsTools.DEFAULT_LABEL_NAMES, - CassandraMetricsTools.DEFAULT_LABEL_VALUES, - config); + parser = CassandraMetricNameParser.getDefaultParser(config); cache = new ConcurrentHashMap<>(); this.familyCache = familyCache; diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java index 18512f34..f2a16a7a 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java @@ -9,6 +9,7 @@ import io.k8ssandra.metrics.config.Configuration; import io.k8ssandra.metrics.http.NettyMetricsHttpServer; import io.k8ssandra.metrics.prometheus.CassandraDropwizardExports; +import io.k8ssandra.metrics.prometheus.CassandraTasksExports; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.prometheus.client.hotspot.DefaultExports; @@ -72,6 +73,9 @@ public static void intercept(@SuperCall Callable zuper) throws Exception { // Add JVM metrics DefaultExports.initialize(); + // Add task metrics + new CassandraTasksExports(CassandraMetricsRegistry.Metrics, config).register(); + // Create /metrics handler. Note, this doesn't support larger than nThreads=1 final EventLoopGroup httpGroup = new EpollEventLoopGroup(1); @@ -81,12 +85,7 @@ public static void intercept(@SuperCall Callable zuper) throws Exception { logger.info("Metrics collector started"); - Runtime.getRuntime() - .addShutdownHook( - new Thread( - () -> { - httpGroup.shutdownGracefully(); - })); + Runtime.getRuntime().addShutdownHook(new Thread(httpGroup::shutdownGracefully)); } catch (Throwable t) { logger.error("Unable to start metrics endpoint", t); } diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java new file mode 100644 index 00000000..71628bd1 --- /dev/null +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java @@ -0,0 +1,135 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package io.k8ssandra.metrics.prometheus; + +import com.codahale.metrics.MetricRegistry; +import com.datastax.mgmtapi.ShimLoader; +import com.google.common.collect.Lists; +import io.k8ssandra.metrics.builder.CassandraMetricDefinition; +import io.k8ssandra.metrics.builder.CassandraMetricNameParser; +import io.k8ssandra.metrics.config.Configuration; +import io.prometheus.client.Collector; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.cassandra.db.compaction.OperationType; +import org.slf4j.LoggerFactory; + +/** + * Collect non-metrics information from Cassandra and turn them to metrics. This is considerably + * slower to collect than the metrics we currently ship. + */ +public class CassandraTasksExports extends Collector implements Collector.Describable { + private static final org.slf4j.Logger logger = + LoggerFactory.getLogger(CassandraTasksExports.class); + + private final MetricRegistry registry; + + private final CassandraMetricNameParser parser; + + public CassandraTasksExports(MetricRegistry registry, Configuration config) { + this.registry = registry; + parser = CassandraMetricNameParser.getDefaultParser(config); + } + + @Override + public List collect() { + + ArrayList familySamples = Lists.newArrayList(); + + // Collect Compaction Task metrics + familySamples.addAll(getCompactionStats()); + + // Collect active streaming sessions + + // Collect other sstableOperations (if not part of Compactions metrics already) + + // Collect JobExecutor tasks + + // Collect MBean ones not exposed currently in CassandraMetrics / 3.11 + + return familySamples; + } + + @Override + public List describe() { + return new ArrayList<>(); + } + + List getCompactionStats() { + + // Cassandra's internal CompactionMetrics are close to what we want, but not exactly. + // And we can't access CompactionManager.getMetrics() to get them in 3.11 + List> compactions = + ShimLoader.instance.get().getCompactionManager().getCompactions().stream() + .filter( + c -> { + String taskType = c.get("taskType"); + OperationType operationType = OperationType.valueOf(taskType); + return operationType != OperationType.COUNTER_CACHE_SAVE + && operationType != OperationType.KEY_CACHE_SAVE + && operationType != OperationType.ROW_CACHE_SAVE; + }) + .collect(Collectors.toList()); + + // Ignore taskTypes: COUNTER_CACHE_SAVE, KEY_CACHE_SAVE, ROW_CACHE_SAVE (from Cassandra 4.1) + ArrayList additionalLabels = + Lists.newArrayList("keyspace", "table", "compaction_id", "unit", "type"); + + // These should be escape handled + CassandraMetricDefinition protoCompleted = + parser.parseDropwizardMetric( + "org_apache_cassandra_metrics_extended_compaction_stats_completed", + "", + additionalLabels, + null); + + CassandraMetricDefinition protoTotal = + parser.parseDropwizardMetric( + "org_apache_cassandra_metrics_extended_compaction_stats_total", + "", + additionalLabels, + null); + + List completedSamples = new ArrayList<>(compactions.size() * 2); + List totalSamples = new ArrayList<>(compactions.size() * 2); + for (Map c : compactions) { + ArrayList labelValues = + Lists.newArrayList( + c.get("keyspace"), + c.get("columnfamily"), + c.get("compactionId"), + c.get("unit"), + c.get("taskType")); + + Collector.MetricFamilySamples.Sample completeSample = + new Collector.MetricFamilySamples.Sample( + protoCompleted.getMetricName(), + protoCompleted.getLabelNames(), + labelValues, + Double.parseDouble(c.get("completed"))); + + Collector.MetricFamilySamples.Sample totalSample = + new Collector.MetricFamilySamples.Sample( + protoCompleted.getMetricName(), + protoCompleted.getLabelNames(), + labelValues, + Double.parseDouble(c.get("total"))); + + completedSamples.add(completeSample); + totalSamples.add(totalSample); + } + + MetricFamilySamples completeFamily = + new MetricFamilySamples(protoCompleted.getMetricName(), Type.GAUGE, "", completedSamples); + + MetricFamilySamples totalFamily = + new MetricFamilySamples(protoTotal.getMetricName(), Type.GAUGE, "", totalSamples); + + return Lists.newArrayList(completeFamily, totalFamily); + } +} From 781b5fab7daf5ae0d92888d68f5104ece64a6aeb Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Wed, 30 Aug 2023 18:48:07 +0300 Subject: [PATCH 02/10] Add parameter to allow disabling gather of extended metrics --- .../io/k8ssandra/metrics/config/Configuration.java | 11 +++++++++++ .../metrics/interceptors/MetricsInterceptor.java | 4 +++- .../src/main/resources/default-metric-settings.yaml | 1 + .../io/k8ssandra/metrics/config/ConfigReaderTest.java | 5 +++++ .../src/test/resources/collector_tls.yaml | 1 + 5 files changed, 21 insertions(+), 1 deletion(-) diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/config/Configuration.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/config/Configuration.java index a6aa08db..094f9795 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/config/Configuration.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/config/Configuration.java @@ -21,6 +21,9 @@ public class Configuration { @JsonProperty("labels") private LabelConfiguration labels; + @JsonProperty("extended_metrics_disabled") + private boolean extendedDisabled; + public Configuration() { relabels = new ArrayList<>(); } @@ -44,4 +47,12 @@ public void setRelabels(List relabels) { public void setLabels(LabelConfiguration labels) { this.labels = labels; } + + public boolean isExtendedDisabled() { + return extendedDisabled; + } + + public void setExtendedDisabled(boolean extendedDisabled) { + this.extendedDisabled = extendedDisabled; + } } diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java index f2a16a7a..9d1869c3 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java @@ -74,7 +74,9 @@ public static void intercept(@SuperCall Callable zuper) throws Exception { DefaultExports.initialize(); // Add task metrics - new CassandraTasksExports(CassandraMetricsRegistry.Metrics, config).register(); + if(!config.isExtendedDisabled()) { + new CassandraTasksExports(CassandraMetricsRegistry.Metrics, config).register(); + } // Create /metrics handler. Note, this doesn't support larger than nThreads=1 final EventLoopGroup httpGroup = new EpollEventLoopGroup(1); diff --git a/management-api-agent-common/src/main/resources/default-metric-settings.yaml b/management-api-agent-common/src/main/resources/default-metric-settings.yaml index c91a54ee..1890e305 100644 --- a/management-api-agent-common/src/main/resources/default-metric-settings.yaml +++ b/management-api-agent-common/src/main/resources/default-metric-settings.yaml @@ -163,3 +163,4 @@ labels: env: pod_name: "POD_NAME" node_name: "NODE_NAME" +extended_metrics_disabled: false diff --git a/management-api-agent-common/src/test/java/io/k8ssandra/metrics/config/ConfigReaderTest.java b/management-api-agent-common/src/test/java/io/k8ssandra/metrics/config/ConfigReaderTest.java index b4c03b76..3c0b9177 100644 --- a/management-api-agent-common/src/test/java/io/k8ssandra/metrics/config/ConfigReaderTest.java +++ b/management-api-agent-common/src/test/java/io/k8ssandra/metrics/config/ConfigReaderTest.java @@ -6,8 +6,10 @@ package io.k8ssandra.metrics.config; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.net.URL; import org.junit.Test; @@ -21,6 +23,7 @@ public void readEmptyConfig() { Configuration configuration = ConfigReader.readCustomConfig(); assertEquals(0, configuration.getRelabels().size()); assertNull(configuration.getEndpointConfiguration()); + assertFalse(configuration.isExtendedDisabled()); } @Test @@ -58,6 +61,8 @@ public void readFromTLSConfigFile() { assertEquals("/etc/ssl/ca.crt", tlsConfig.getCaCertPath()); assertEquals("/etc/ssl/tls.crt", tlsConfig.getTlsCertPath()); assertEquals("/etc/ssl/tls.key", tlsConfig.getTlsKeyPath()); + + assertTrue(configuration.isExtendedDisabled()); } @Test diff --git a/management-api-agent-common/src/test/resources/collector_tls.yaml b/management-api-agent-common/src/test/resources/collector_tls.yaml index 941d6f34..0225f07d 100644 --- a/management-api-agent-common/src/test/resources/collector_tls.yaml +++ b/management-api-agent-common/src/test/resources/collector_tls.yaml @@ -13,3 +13,4 @@ relabels: separator: "," regex: "^(a|b|c),.*" action: "drop" +extended_metrics_disabled: true \ No newline at end of file From 7da5c260857949cb388229f3abdc98ce7032f877 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Thu, 31 Aug 2023 17:54:19 +0300 Subject: [PATCH 03/10] Add StreamInfo metrics --- .../interceptors/MetricsInterceptor.java | 2 +- .../prometheus/CassandraTasksExports.java | 204 +++++++++++++++++- 2 files changed, 196 insertions(+), 10 deletions(-) diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java index 9d1869c3..c4f35938 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java @@ -74,7 +74,7 @@ public static void intercept(@SuperCall Callable zuper) throws Exception { DefaultExports.initialize(); // Add task metrics - if(!config.isExtendedDisabled()) { + if (!config.isExtendedDisabled()) { new CassandraTasksExports(CassandraMetricsRegistry.Metrics, config).register(); } diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java index 71628bd1..bf871ced 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java @@ -27,6 +27,7 @@ public class CassandraTasksExports extends Collector implements Collector.Descri private static final org.slf4j.Logger logger = LoggerFactory.getLogger(CassandraTasksExports.class); + private static final String METRICS_PREFIX = "org_apache_cassandra_metrics_extended_"; private final MetricRegistry registry; private final CassandraMetricNameParser parser; @@ -45,6 +46,7 @@ public List collect() { familySamples.addAll(getCompactionStats()); // Collect active streaming sessions + familySamples.addAll(getStreamInfoStats()); // Collect other sstableOperations (if not part of Compactions metrics already) @@ -60,6 +62,196 @@ public List describe() { return new ArrayList<>(); } + List getStreamInfoStats() { + ArrayList additionalLabels = + Lists.newArrayList("plan_id", "operation", "peer", "connection"); + + // These should be EA targets, 8 metrics to create + CassandraMetricDefinition filesToReceive = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_files_to_receive", "", additionalLabels, null); + + CassandraMetricDefinition filesReceived = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_files_received", "", additionalLabels, null); + + CassandraMetricDefinition sizeToReceive = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_size_to_receive", "", additionalLabels, null); + + CassandraMetricDefinition sizeReceived = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_size_received", "", additionalLabels, null); + + CassandraMetricDefinition filesToSend = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_files_to_send", "", additionalLabels, null); + + CassandraMetricDefinition filesSent = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_files_sent", "", additionalLabels, null); + + CassandraMetricDefinition sizeToSend = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_size_to_send", "", additionalLabels, null); + + CassandraMetricDefinition sizeSent = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_size_sent", "", additionalLabels, null); + + // This is a lot simpler code without all the casting back and forth if description was in the + // same place for + // 3.11 and 4.x. Simplify this once 3.11 support is dropped to use + // StreamManager.instance.getCurrentStreams() .. + + List>>> streamInfos = + ShimLoader.instance.get().getStreamInfo(); + + List totalFilesToReceiveSamples = new ArrayList<>(); + List totalFilesReceivedSamples = new ArrayList<>(); + List totalSizeToReceiveSamples = new ArrayList<>(); + List totalSizeReceivedSamples = new ArrayList<>(); + List totalFilesToSendSamples = new ArrayList<>(); + List totalFilesSentSamples = new ArrayList<>(); + List totalSizeToSendSamples = new ArrayList<>(); + List totalSizeSentSamples = new ArrayList<>(); + + for (Map>> streamInfo : streamInfos) { + for (Map.Entry>> sessionResults : streamInfo.entrySet()) { + String planId = sessionResults.getKey(); + for (Map session : sessionResults.getValue()) { + ArrayList labelValues = + Lists.newArrayList( + planId, + session.get("STREAM_OPERATION"), + session.get("PEER"), + session.get("USING_CONNECTION")); + + long totalFilesToReceive = Long.parseLong(session.get("TOTAL_FILES_TO_RECEIVE")); + long totalFilesReceived = Long.parseLong(session.get("TOTAL_FILES_RECEIVED")); + long totalSizeToReceive = Long.parseLong(session.get("TOTAL_SIZE_TO_RECEIVE")); + long totalSizeReceived = Long.parseLong(session.get("TOTAL_SIZE_RECEIVED")); + long totalFilesToSend = Long.parseLong(session.get("TOTAL_FILES_TO_SEND")); + long totalFilesSent = Long.parseLong(session.get("TOTAL_FILES_SENT")); + long totalSizeToSend = Long.parseLong(session.get("TOTAL_SIZE_TO_SEND")); + long totalSizeSent = Long.parseLong(session.get("TOTAL_SIZE_SENT")); + + // Receive samples + Collector.MetricFamilySamples.Sample totalFilesToReceiveSample = + new Collector.MetricFamilySamples.Sample( + filesToReceive.getMetricName(), + filesToReceive.getLabelNames(), + labelValues, + totalFilesToReceive); + + totalFilesToReceiveSamples.add(totalFilesToReceiveSample); + + Collector.MetricFamilySamples.Sample totalFilesReceivedSample = + new Collector.MetricFamilySamples.Sample( + filesReceived.getMetricName(), + filesReceived.getLabelNames(), + labelValues, + totalFilesReceived); + + totalFilesReceivedSamples.add(totalFilesReceivedSample); + + Collector.MetricFamilySamples.Sample totalSizeToReceiveSample = + new Collector.MetricFamilySamples.Sample( + sizeToReceive.getMetricName(), + sizeToReceive.getLabelNames(), + labelValues, + totalSizeToReceive); + + totalSizeToReceiveSamples.add(totalSizeToReceiveSample); + + Collector.MetricFamilySamples.Sample totalSizeReceivedSample = + new Collector.MetricFamilySamples.Sample( + sizeReceived.getMetricName(), + sizeReceived.getLabelNames(), + labelValues, + totalSizeReceived); + + totalSizeReceivedSamples.add(totalSizeReceivedSample); + + // Send samples + Collector.MetricFamilySamples.Sample totalFilesToSendSample = + new Collector.MetricFamilySamples.Sample( + filesToSend.getMetricName(), + filesToSend.getLabelNames(), + labelValues, + totalFilesToSend); + + totalFilesToSendSamples.add(totalFilesToSendSample); + + Collector.MetricFamilySamples.Sample totalFilesSentSample = + new Collector.MetricFamilySamples.Sample( + filesSent.getMetricName(), + filesSent.getLabelNames(), + labelValues, + totalFilesSent); + + totalFilesSentSamples.add(totalFilesSentSample); + + Collector.MetricFamilySamples.Sample totalSizeToSendSample = + new Collector.MetricFamilySamples.Sample( + sizeToSend.getMetricName(), + sizeToSend.getLabelNames(), + labelValues, + totalSizeToSend); + + totalSizeToSendSamples.add(totalSizeToSendSample); + + Collector.MetricFamilySamples.Sample totalSizeSentSample = + new Collector.MetricFamilySamples.Sample( + sizeSent.getMetricName(), sizeSent.getLabelNames(), labelValues, totalSizeSent); + + totalSizeSentSamples.add(totalSizeSentSample); + } + } + } + + // Receive + MetricFamilySamples filesToReceiveFamily = + new MetricFamilySamples( + filesToReceive.getMetricName(), Type.GAUGE, "", totalFilesToReceiveSamples); + + MetricFamilySamples filesReceivedFamily = + new MetricFamilySamples( + filesReceived.getMetricName(), Type.GAUGE, "", totalFilesReceivedSamples); + + MetricFamilySamples sizeToReceiveFamily = + new MetricFamilySamples( + sizeToReceive.getMetricName(), Type.GAUGE, "", totalSizeToReceiveSamples); + + MetricFamilySamples sizeReceivedFamily = + new MetricFamilySamples( + sizeReceived.getMetricName(), Type.GAUGE, "", totalSizeReceivedSamples); + + // Send + MetricFamilySamples filesToSendFamily = + new MetricFamilySamples( + filesToSend.getMetricName(), Type.GAUGE, "", totalFilesToSendSamples); + + MetricFamilySamples filesSentFamily = + new MetricFamilySamples(filesSent.getMetricName(), Type.GAUGE, "", totalFilesSentSamples); + + MetricFamilySamples sizeToSendFamily = + new MetricFamilySamples(sizeToSend.getMetricName(), Type.GAUGE, "", totalSizeToSendSamples); + + MetricFamilySamples sizeSentFamily = + new MetricFamilySamples(sizeSent.getMetricName(), Type.GAUGE, "", totalSizeSentSamples); + + return Lists.newArrayList( + filesToReceiveFamily, + filesReceivedFamily, + sizeToReceiveFamily, + sizeReceivedFamily, + filesToSendFamily, + filesSentFamily, + sizeToSendFamily, + sizeSentFamily); + } + List getCompactionStats() { // Cassandra's internal CompactionMetrics are close to what we want, but not exactly. @@ -80,20 +272,14 @@ List getCompactionStats() { ArrayList additionalLabels = Lists.newArrayList("keyspace", "table", "compaction_id", "unit", "type"); - // These should be escape handled + // These should be EA targets.. CassandraMetricDefinition protoCompleted = parser.parseDropwizardMetric( - "org_apache_cassandra_metrics_extended_compaction_stats_completed", - "", - additionalLabels, - null); + METRICS_PREFIX + "compaction_stats_completed", "", additionalLabels, null); CassandraMetricDefinition protoTotal = parser.parseDropwizardMetric( - "org_apache_cassandra_metrics_extended_compaction_stats_total", - "", - additionalLabels, - null); + METRICS_PREFIX + "compaction_stats_total", "", additionalLabels, null); List completedSamples = new ArrayList<>(compactions.size() * 2); List totalSamples = new ArrayList<>(compactions.size() * 2); From 192f02089b1a7896a5526c952669de69b6c225f5 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Tue, 5 Sep 2023 12:19:29 +0300 Subject: [PATCH 04/10] Fix NPE --- .../prometheus/CassandraTasksExports.java | 44 ++++++++++++++----- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java index bf871ced..dc95c727 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java @@ -69,35 +69,59 @@ List getStreamInfoStats() { // These should be EA targets, 8 metrics to create CassandraMetricDefinition filesToReceive = parser.parseDropwizardMetric( - METRICS_PREFIX + "streaming_total_files_to_receive", "", additionalLabels, null); + METRICS_PREFIX + "streaming_total_files_to_receive", + "", + additionalLabels, + Lists.newArrayList()); CassandraMetricDefinition filesReceived = parser.parseDropwizardMetric( - METRICS_PREFIX + "streaming_total_files_received", "", additionalLabels, null); + METRICS_PREFIX + "streaming_total_files_received", + "", + additionalLabels, + Lists.newArrayList()); CassandraMetricDefinition sizeToReceive = parser.parseDropwizardMetric( - METRICS_PREFIX + "streaming_total_size_to_receive", "", additionalLabels, null); + METRICS_PREFIX + "streaming_total_size_to_receive", + "", + additionalLabels, + Lists.newArrayList()); CassandraMetricDefinition sizeReceived = parser.parseDropwizardMetric( - METRICS_PREFIX + "streaming_total_size_received", "", additionalLabels, null); + METRICS_PREFIX + "streaming_total_size_received", + "", + additionalLabels, + Lists.newArrayList()); CassandraMetricDefinition filesToSend = parser.parseDropwizardMetric( - METRICS_PREFIX + "streaming_total_files_to_send", "", additionalLabels, null); + METRICS_PREFIX + "streaming_total_files_to_send", + "", + additionalLabels, + Lists.newArrayList()); CassandraMetricDefinition filesSent = parser.parseDropwizardMetric( - METRICS_PREFIX + "streaming_total_files_sent", "", additionalLabels, null); + METRICS_PREFIX + "streaming_total_files_sent", + "", + additionalLabels, + Lists.newArrayList()); CassandraMetricDefinition sizeToSend = parser.parseDropwizardMetric( - METRICS_PREFIX + "streaming_total_size_to_send", "", additionalLabels, null); + METRICS_PREFIX + "streaming_total_size_to_send", + "", + additionalLabels, + Lists.newArrayList()); CassandraMetricDefinition sizeSent = parser.parseDropwizardMetric( - METRICS_PREFIX + "streaming_total_size_sent", "", additionalLabels, null); + METRICS_PREFIX + "streaming_total_size_sent", + "", + additionalLabels, + Lists.newArrayList()); // This is a lot simpler code without all the casting back and forth if description was in the // same place for @@ -275,11 +299,11 @@ List getCompactionStats() { // These should be EA targets.. CassandraMetricDefinition protoCompleted = parser.parseDropwizardMetric( - METRICS_PREFIX + "compaction_stats_completed", "", additionalLabels, null); + METRICS_PREFIX + "compaction_stats_completed", "", additionalLabels, new ArrayList<>()); CassandraMetricDefinition protoTotal = parser.parseDropwizardMetric( - METRICS_PREFIX + "compaction_stats_total", "", additionalLabels, null); + METRICS_PREFIX + "compaction_stats_total", "", additionalLabels, new ArrayList<>()); List completedSamples = new ArrayList<>(compactions.size() * 2); List totalSamples = new ArrayList<>(compactions.size() * 2); From 6f94a4b6d0c9f829a62adec84731995c1af27e69 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Tue, 3 Oct 2023 14:29:24 +0300 Subject: [PATCH 05/10] Improve backwards compatibility --- .../metrics/prometheus/CassandraTasksExports.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java index dc95c727..bd30c129 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java @@ -285,10 +285,14 @@ List getCompactionStats() { .filter( c -> { String taskType = c.get("taskType"); - OperationType operationType = OperationType.valueOf(taskType); - return operationType != OperationType.COUNTER_CACHE_SAVE - && operationType != OperationType.KEY_CACHE_SAVE - && operationType != OperationType.ROW_CACHE_SAVE; + try { + OperationType operationType = OperationType.valueOf(taskType.toUpperCase()); + return operationType != OperationType.COUNTER_CACHE_SAVE + && operationType != OperationType.KEY_CACHE_SAVE + && operationType != OperationType.ROW_CACHE_SAVE; + } catch(IllegalArgumentException e) { + return false; + } }) .collect(Collectors.toList()); From bcd0014cf5a8937e045652dff77531c29b283aad Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Tue, 3 Oct 2023 14:32:27 +0300 Subject: [PATCH 06/10] Fix formatting --- .../io/k8ssandra/metrics/prometheus/CassandraTasksExports.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java index bd30c129..04fefe3d 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java @@ -290,7 +290,7 @@ List getCompactionStats() { return operationType != OperationType.COUNTER_CACHE_SAVE && operationType != OperationType.KEY_CACHE_SAVE && operationType != OperationType.ROW_CACHE_SAVE; - } catch(IllegalArgumentException e) { + } catch (IllegalArgumentException e) { return false; } }) From c55804528aece0cf9db21079f6abe949f30c79d4 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Wed, 4 Oct 2023 16:34:33 +0300 Subject: [PATCH 07/10] Fix labelValues adder (defaultLabelValuse were overwritten), add tests to verify the methods do not error and have correct labelValues and labelNames sizes in the end results also. --- management-api-agent-common/pom.xml | 6 + .../prometheus/CassandraTasksExports.java | 47 +++++--- .../metrics/prometheus/TaskExportsTests.java | 106 ++++++++++++++++++ 3 files changed, 142 insertions(+), 17 deletions(-) create mode 100644 management-api-agent-common/src/test/java/io/k8ssandra/metrics/prometheus/TaskExportsTests.java diff --git a/management-api-agent-common/pom.xml b/management-api-agent-common/pom.xml index 385da8ab..c82f03c2 100644 --- a/management-api-agent-common/pom.xml +++ b/management-api-agent-common/pom.xml @@ -60,6 +60,12 @@ ${junit.version} test + + org.mockito + mockito-core + 5.5.0 + test + io.netty netty-all diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java index 04fefe3d..51bd2a97 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java @@ -7,6 +7,7 @@ import com.codahale.metrics.MetricRegistry; import com.datastax.mgmtapi.ShimLoader; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import io.k8ssandra.metrics.builder.CassandraMetricDefinition; import io.k8ssandra.metrics.builder.CassandraMetricNameParser; @@ -62,6 +63,12 @@ public List describe() { return new ArrayList<>(); } + // Exported here to allow easier testing + @VisibleForTesting + List>>> getStreamInfos() { + return ShimLoader.instance.get().getStreamInfo(); + } + List getStreamInfoStats() { ArrayList additionalLabels = Lists.newArrayList("plan_id", "operation", "peer", "connection"); @@ -128,8 +135,7 @@ List getStreamInfoStats() { // 3.11 and 4.x. Simplify this once 3.11 support is dropped to use // StreamManager.instance.getCurrentStreams() .. - List>>> streamInfos = - ShimLoader.instance.get().getStreamInfo(); + List>>> streamInfos = getStreamInfos(); List totalFilesToReceiveSamples = new ArrayList<>(); List totalFilesReceivedSamples = new ArrayList<>(); @@ -144,12 +150,13 @@ List getStreamInfoStats() { for (Map.Entry>> sessionResults : streamInfo.entrySet()) { String planId = sessionResults.getKey(); for (Map session : sessionResults.getValue()) { - ArrayList labelValues = - Lists.newArrayList( - planId, - session.get("STREAM_OPERATION"), - session.get("PEER"), - session.get("USING_CONNECTION")); + List labelValues = + Lists.newArrayListWithCapacity(filesToReceive.getLabelValues().size() + 4); + labelValues.addAll(filesToReceive.getLabelValues()); + labelValues.add(planId); + labelValues.add(session.get("STREAM_OPERATION")); + labelValues.add(session.get("PEER")); + labelValues.add(session.get("USING_CONNECTION")); long totalFilesToReceive = Long.parseLong(session.get("TOTAL_FILES_TO_RECEIVE")); long totalFilesReceived = Long.parseLong(session.get("TOTAL_FILES_RECEIVED")); @@ -276,17 +283,23 @@ List getStreamInfoStats() { sizeSentFamily); } + List> getCompactions() { + return ShimLoader.instance.get().getCompactionManager().getCompactions(); + } + List getCompactionStats() { // Cassandra's internal CompactionMetrics are close to what we want, but not exactly. // And we can't access CompactionManager.getMetrics() to get them in 3.11 List> compactions = - ShimLoader.instance.get().getCompactionManager().getCompactions().stream() + getCompactions().stream() .filter( c -> { String taskType = c.get("taskType"); try { OperationType operationType = OperationType.valueOf(taskType.toUpperCase()); + // Ignore taskTypes: COUNTER_CACHE_SAVE, KEY_CACHE_SAVE, ROW_CACHE_SAVE (from + // Cassandra 4.1) return operationType != OperationType.COUNTER_CACHE_SAVE && operationType != OperationType.KEY_CACHE_SAVE && operationType != OperationType.ROW_CACHE_SAVE; @@ -296,7 +309,6 @@ List getCompactionStats() { }) .collect(Collectors.toList()); - // Ignore taskTypes: COUNTER_CACHE_SAVE, KEY_CACHE_SAVE, ROW_CACHE_SAVE (from Cassandra 4.1) ArrayList additionalLabels = Lists.newArrayList("keyspace", "table", "compaction_id", "unit", "type"); @@ -312,13 +324,14 @@ List getCompactionStats() { List completedSamples = new ArrayList<>(compactions.size() * 2); List totalSamples = new ArrayList<>(compactions.size() * 2); for (Map c : compactions) { - ArrayList labelValues = - Lists.newArrayList( - c.get("keyspace"), - c.get("columnfamily"), - c.get("compactionId"), - c.get("unit"), - c.get("taskType")); + List labelValues = + Lists.newArrayListWithCapacity(protoCompleted.getLabelValues().size() + 5); + labelValues.addAll(protoCompleted.getLabelValues()); + labelValues.add(c.get("keyspace")); + labelValues.add(c.get("columnfamily")); + labelValues.add(c.get("compactionId")); + labelValues.add(c.get("unit")); + labelValues.add(c.get("taskType")); Collector.MetricFamilySamples.Sample completeSample = new Collector.MetricFamilySamples.Sample( diff --git a/management-api-agent-common/src/test/java/io/k8ssandra/metrics/prometheus/TaskExportsTests.java b/management-api-agent-common/src/test/java/io/k8ssandra/metrics/prometheus/TaskExportsTests.java new file mode 100644 index 00000000..62ed0f10 --- /dev/null +++ b/management-api-agent-common/src/test/java/io/k8ssandra/metrics/prometheus/TaskExportsTests.java @@ -0,0 +1,106 @@ +package io.k8ssandra.metrics.prometheus; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.codahale.metrics.MetricRegistry; +import com.google.common.collect.Lists; +import io.k8ssandra.metrics.config.Configuration; +import io.prometheus.client.Collector; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.cassandra.db.compaction.OperationType; +import org.junit.Test; +import org.mockito.Mockito; + +public class TaskExportsTests { + + @Test + public void testStreamInfoStats() { + MetricRegistry mockRegistry = mock(MetricRegistry.class); + Configuration config = new Configuration(); + CassandraTasksExports exports = new CassandraTasksExports(mockRegistry, config); + CassandraTasksExports spy = Mockito.spy(exports); + Mockito.doReturn(getStreamInfoMock()).when(spy).getStreamInfos(); + when(spy.getStreamInfoStats()).thenCallRealMethod(); + + List streamInfoStats = spy.getStreamInfoStats(); + assertEquals(8, streamInfoStats.size()); + assertEquals(10, streamInfoStats.get(0).samples.size()); + for (Collector.MetricFamilySamples streamInfoStat : streamInfoStats) { + for (Collector.MetricFamilySamples.Sample sample : streamInfoStat.samples) { + assertEquals(9, sample.labelNames.size()); + assertEquals(sample.labelNames.size(), sample.labelValues.size()); + } + } + } + + @Test + public void testCompactionStats() { + MetricRegistry mockRegistry = mock(MetricRegistry.class); + Configuration config = new Configuration(); + CassandraTasksExports exports = new CassandraTasksExports(mockRegistry, config); + CassandraTasksExports spy = Mockito.spy(exports); + Mockito.doReturn(getCompactionsMock()).when(spy).getCompactions(); + when(spy.getCompactionStats()).thenCallRealMethod(); + + assertEquals(2, spy.getCompactionStats().size()); + for (Collector.MetricFamilySamples compactionStat : spy.getCompactionStats()) { + assertEquals(1, compactionStat.samples.size()); + Collector.MetricFamilySamples.Sample sample = compactionStat.samples.get(0); + assertEquals(10, sample.labelNames.size()); + assertEquals(sample.labelValues.size(), sample.labelNames.size()); + } + } + + private List> getCompactionsMock() { + ArrayList> results = Lists.newArrayList(); + + Map ret = new HashMap<>(); + ret.put("id", ""); + ret.put("keyspace", "getKeyspace()"); + ret.put("columnfamily", "getColumnFamily()"); + ret.put("completed", "1"); + ret.put("total", "2"); + ret.put("taskType", OperationType.COMPACTION.name()); + ret.put("unit", "unit.toString()"); + ret.put("compactionId", "compactionId"); + + results.add(ret); + + return results; + } + + private List>>> getStreamInfoMock() { + List>>> result = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + Map>> streamInfo = new HashMap<>(); + List> sessionResults = new ArrayList<>(); + + Map sessionInfo = new HashMap<>(); + sessionInfo.put("STREAM_OPERATION", "testStreaming"); + sessionInfo.put("PEER", "127.0.0.1"); + sessionInfo.put("USING_CONNECTION", "127.0.0.1"); + sessionInfo.put("TOTAL_FILES_TO_RECEIVE", "10"); + sessionInfo.put("TOTAL_FILES_RECEIVED", "9"); + sessionInfo.put("TOTAL_SIZE_TO_RECEIVE", "128"); + sessionInfo.put("TOTAL_SIZE_RECEIVED", "127"); + + sessionInfo.put("TOTAL_FILES_TO_SEND", "5"); + sessionInfo.put("TOTAL_FILES_SENT", "4"); + sessionInfo.put("TOTAL_SIZE_TO_SEND", "512"); + sessionInfo.put("TOTAL_SIZE_SENT", "511"); + sessionResults.add(sessionInfo); + + streamInfo.put("123456", sessionResults); + + result.add(streamInfo); + } + + return result; + } +} From 9d689d0d3ed0703f2352ff9903e2f3865bfcb3fc Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Thu, 5 Oct 2023 09:41:42 +0300 Subject: [PATCH 08/10] Add missing license header --- .../io/k8ssandra/metrics/prometheus/TaskExportsTests.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/management-api-agent-common/src/test/java/io/k8ssandra/metrics/prometheus/TaskExportsTests.java b/management-api-agent-common/src/test/java/io/k8ssandra/metrics/prometheus/TaskExportsTests.java index 62ed0f10..fb084455 100644 --- a/management-api-agent-common/src/test/java/io/k8ssandra/metrics/prometheus/TaskExportsTests.java +++ b/management-api-agent-common/src/test/java/io/k8ssandra/metrics/prometheus/TaskExportsTests.java @@ -1,3 +1,8 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ package io.k8ssandra.metrics.prometheus; import static org.junit.Assert.assertEquals; From c06a11787d1982c19bc8c2c0be95e51cf572c8ab Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Thu, 5 Oct 2023 16:48:36 +0300 Subject: [PATCH 09/10] Fix naming of _total metric --- .../k8ssandra/metrics/prometheus/CassandraTasksExports.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java index 51bd2a97..ed55820c 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java @@ -342,8 +342,8 @@ List getCompactionStats() { Collector.MetricFamilySamples.Sample totalSample = new Collector.MetricFamilySamples.Sample( - protoCompleted.getMetricName(), - protoCompleted.getLabelNames(), + protoTotal.getMetricName(), + protoTotal.getLabelNames(), labelValues, Double.parseDouble(c.get("total"))); From c4f4b22df31ea57f7c7cb9a7225a395f3bdeded7 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Mon, 9 Oct 2023 09:36:54 +0300 Subject: [PATCH 10/10] Add more defensives --- .../prometheus/CassandraTasksExports.java | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java index ed55820c..740ade44 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java @@ -296,6 +296,15 @@ List getCompactionStats() { .filter( c -> { String taskType = c.get("taskType"); + if (taskType == null) { + // DSE 6.8 renamed the column + taskType = c.get("operationType"); + } + + if (taskType == null) { + return false; + } + try { OperationType operationType = OperationType.valueOf(taskType.toUpperCase()); // Ignore taskTypes: COUNTER_CACHE_SAVE, KEY_CACHE_SAVE, ROW_CACHE_SAVE (from @@ -327,11 +336,24 @@ List getCompactionStats() { List labelValues = Lists.newArrayListWithCapacity(protoCompleted.getLabelValues().size() + 5); labelValues.addAll(protoCompleted.getLabelValues()); + + String compactionId = c.get("compactionId"); + if (compactionId == null) { + // DSE 6.8 renamed this one also + compactionId = c.get("operationId"); + } + + String taskType = c.get("taskType"); + if (taskType == null) { + // DSE 6.8 + taskType = c.get("operationType"); + } + labelValues.add(c.get("keyspace")); labelValues.add(c.get("columnfamily")); - labelValues.add(c.get("compactionId")); + labelValues.add(compactionId); labelValues.add(c.get("unit")); - labelValues.add(c.get("taskType")); + labelValues.add(taskType); Collector.MetricFamilySamples.Sample completeSample = new Collector.MetricFamilySamples.Sample(