diff --git a/management-api-agent-common/pom.xml b/management-api-agent-common/pom.xml
index 385da8ab..c82f03c2 100644
--- a/management-api-agent-common/pom.xml
+++ b/management-api-agent-common/pom.xml
@@ -60,6 +60,12 @@
${junit.version}
test
+
+ org.mockito
+ mockito-core
+ 5.5.0
+ test
+
io.netty
netty-all
diff --git a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java
index f51e4f9e..2e086a15 100644
--- a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java
+++ b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java
@@ -10,11 +10,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.cassandra.utils.Pair;
public class JobExecutor {
ExecutorService executorService = Executors.newFixedThreadPool(1);
- Cache jobCache = CacheBuilder.newBuilder().maximumSize(1000).build();
+ Cache jobCache = CacheBuilder.newBuilder().recordStats().maximumSize(1000).build();
public Pair> submit(String jobType, Runnable runnable) {
// Where do I create the job details? Here? Add it to the Cache first?
@@ -45,4 +46,12 @@ public Pair> submit(String jobType, Runnable run
public Job getJobWithId(String jobId) {
return jobCache.getIfPresent(jobId);
}
+
+ public int runningTasks() {
+ return ((ThreadPoolExecutor) executorService).getActiveCount();
+ }
+
+ public int queuedTasks() {
+ return ((ThreadPoolExecutor) executorService).getQueue().size();
+ }
}
diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricNameParser.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricNameParser.java
index 5bdd3d21..fa289dd0 100644
--- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricNameParser.java
+++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricNameParser.java
@@ -42,6 +42,13 @@ public CassandraMetricNameParser(
}
}
+ public static CassandraMetricNameParser getDefaultParser(Configuration config) {
+ return new CassandraMetricNameParser(
+ CassandraMetricsTools.DEFAULT_LABEL_NAMES,
+ CassandraMetricsTools.DEFAULT_LABEL_VALUES,
+ config);
+ }
+
private void parseEnvVariablesAsLabels(Map envSettings) {
for (Map.Entry entry : envSettings.entrySet()) {
String envValue = System.getenv(entry.getValue());
diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java
index 7e8b515e..30c0eb00 100644
--- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java
+++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java
@@ -77,11 +77,7 @@ public class CassandraMetricRegistryListener implements MetricRegistryListener {
public CassandraMetricRegistryListener(
ConcurrentHashMap familyCache, Configuration config)
throws NoSuchMethodException {
- parser =
- new CassandraMetricNameParser(
- CassandraMetricsTools.DEFAULT_LABEL_NAMES,
- CassandraMetricsTools.DEFAULT_LABEL_VALUES,
- config);
+ parser = CassandraMetricNameParser.getDefaultParser(config);
cache = new ConcurrentHashMap<>();
this.familyCache = familyCache;
diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/config/Configuration.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/config/Configuration.java
index a6aa08db..094f9795 100644
--- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/config/Configuration.java
+++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/config/Configuration.java
@@ -21,6 +21,9 @@ public class Configuration {
@JsonProperty("labels")
private LabelConfiguration labels;
+ @JsonProperty("extended_metrics_disabled")
+ private boolean extendedDisabled;
+
public Configuration() {
relabels = new ArrayList<>();
}
@@ -44,4 +47,12 @@ public void setRelabels(List relabels) {
public void setLabels(LabelConfiguration labels) {
this.labels = labels;
}
+
+ public boolean isExtendedDisabled() {
+ return extendedDisabled;
+ }
+
+ public void setExtendedDisabled(boolean extendedDisabled) {
+ this.extendedDisabled = extendedDisabled;
+ }
}
diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java
index 18512f34..c4f35938 100644
--- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java
+++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java
@@ -9,6 +9,7 @@
import io.k8ssandra.metrics.config.Configuration;
import io.k8ssandra.metrics.http.NettyMetricsHttpServer;
import io.k8ssandra.metrics.prometheus.CassandraDropwizardExports;
+import io.k8ssandra.metrics.prometheus.CassandraTasksExports;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.prometheus.client.hotspot.DefaultExports;
@@ -72,6 +73,11 @@ public static void intercept(@SuperCall Callable zuper) throws Exception {
// Add JVM metrics
DefaultExports.initialize();
+ // Add task metrics
+ if (!config.isExtendedDisabled()) {
+ new CassandraTasksExports(CassandraMetricsRegistry.Metrics, config).register();
+ }
+
// Create /metrics handler. Note, this doesn't support larger than nThreads=1
final EventLoopGroup httpGroup = new EpollEventLoopGroup(1);
@@ -81,12 +87,7 @@ public static void intercept(@SuperCall Callable zuper) throws Exception {
logger.info("Metrics collector started");
- Runtime.getRuntime()
- .addShutdownHook(
- new Thread(
- () -> {
- httpGroup.shutdownGracefully();
- }));
+ Runtime.getRuntime().addShutdownHook(new Thread(httpGroup::shutdownGracefully));
} catch (Throwable t) {
logger.error("Unable to start metrics endpoint", t);
}
diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java
new file mode 100644
index 00000000..740ade44
--- /dev/null
+++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java
@@ -0,0 +1,384 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Please see the included license file for details.
+ */
+package io.k8ssandra.metrics.prometheus;
+
+import com.codahale.metrics.MetricRegistry;
+import com.datastax.mgmtapi.ShimLoader;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import io.k8ssandra.metrics.builder.CassandraMetricDefinition;
+import io.k8ssandra.metrics.builder.CassandraMetricNameParser;
+import io.k8ssandra.metrics.config.Configuration;
+import io.prometheus.client.Collector;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Collect non-metrics information from Cassandra and turn them to metrics. This is considerably
+ * slower to collect than the metrics we currently ship.
+ */
+public class CassandraTasksExports extends Collector implements Collector.Describable {
+ private static final org.slf4j.Logger logger =
+ LoggerFactory.getLogger(CassandraTasksExports.class);
+
+ private static final String METRICS_PREFIX = "org_apache_cassandra_metrics_extended_";
+ private final MetricRegistry registry;
+
+ private final CassandraMetricNameParser parser;
+
+ public CassandraTasksExports(MetricRegistry registry, Configuration config) {
+ this.registry = registry;
+ parser = CassandraMetricNameParser.getDefaultParser(config);
+ }
+
+ @Override
+ public List collect() {
+
+ ArrayList familySamples = Lists.newArrayList();
+
+ // Collect Compaction Task metrics
+ familySamples.addAll(getCompactionStats());
+
+ // Collect active streaming sessions
+ familySamples.addAll(getStreamInfoStats());
+
+ // Collect other sstableOperations (if not part of Compactions metrics already)
+
+ // Collect JobExecutor tasks
+
+ // Collect MBean ones not exposed currently in CassandraMetrics / 3.11
+
+ return familySamples;
+ }
+
+ @Override
+ public List describe() {
+ return new ArrayList<>();
+ }
+
+ // Exported here to allow easier testing
+ @VisibleForTesting
+ List