diff --git a/management-api-agent-common/pom.xml b/management-api-agent-common/pom.xml index 385da8ab..c82f03c2 100644 --- a/management-api-agent-common/pom.xml +++ b/management-api-agent-common/pom.xml @@ -60,6 +60,12 @@ ${junit.version} test + + org.mockito + mockito-core + 5.5.0 + test + io.netty netty-all diff --git a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java index f51e4f9e..2e086a15 100644 --- a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java +++ b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java @@ -10,11 +10,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.cassandra.utils.Pair; public class JobExecutor { ExecutorService executorService = Executors.newFixedThreadPool(1); - Cache jobCache = CacheBuilder.newBuilder().maximumSize(1000).build(); + Cache jobCache = CacheBuilder.newBuilder().recordStats().maximumSize(1000).build(); public Pair> submit(String jobType, Runnable runnable) { // Where do I create the job details? Here? Add it to the Cache first? @@ -45,4 +46,12 @@ public Pair> submit(String jobType, Runnable run public Job getJobWithId(String jobId) { return jobCache.getIfPresent(jobId); } + + public int runningTasks() { + return ((ThreadPoolExecutor) executorService).getActiveCount(); + } + + public int queuedTasks() { + return ((ThreadPoolExecutor) executorService).getQueue().size(); + } } diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricNameParser.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricNameParser.java index 5bdd3d21..fa289dd0 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricNameParser.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricNameParser.java @@ -42,6 +42,13 @@ public CassandraMetricNameParser( } } + public static CassandraMetricNameParser getDefaultParser(Configuration config) { + return new CassandraMetricNameParser( + CassandraMetricsTools.DEFAULT_LABEL_NAMES, + CassandraMetricsTools.DEFAULT_LABEL_VALUES, + config); + } + private void parseEnvVariablesAsLabels(Map envSettings) { for (Map.Entry entry : envSettings.entrySet()) { String envValue = System.getenv(entry.getValue()); diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java index 7e8b515e..30c0eb00 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/builder/CassandraMetricRegistryListener.java @@ -77,11 +77,7 @@ public class CassandraMetricRegistryListener implements MetricRegistryListener { public CassandraMetricRegistryListener( ConcurrentHashMap familyCache, Configuration config) throws NoSuchMethodException { - parser = - new CassandraMetricNameParser( - CassandraMetricsTools.DEFAULT_LABEL_NAMES, - CassandraMetricsTools.DEFAULT_LABEL_VALUES, - config); + parser = CassandraMetricNameParser.getDefaultParser(config); cache = new ConcurrentHashMap<>(); this.familyCache = familyCache; diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/config/Configuration.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/config/Configuration.java index a6aa08db..094f9795 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/config/Configuration.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/config/Configuration.java @@ -21,6 +21,9 @@ public class Configuration { @JsonProperty("labels") private LabelConfiguration labels; + @JsonProperty("extended_metrics_disabled") + private boolean extendedDisabled; + public Configuration() { relabels = new ArrayList<>(); } @@ -44,4 +47,12 @@ public void setRelabels(List relabels) { public void setLabels(LabelConfiguration labels) { this.labels = labels; } + + public boolean isExtendedDisabled() { + return extendedDisabled; + } + + public void setExtendedDisabled(boolean extendedDisabled) { + this.extendedDisabled = extendedDisabled; + } } diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java index 18512f34..c4f35938 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java @@ -9,6 +9,7 @@ import io.k8ssandra.metrics.config.Configuration; import io.k8ssandra.metrics.http.NettyMetricsHttpServer; import io.k8ssandra.metrics.prometheus.CassandraDropwizardExports; +import io.k8ssandra.metrics.prometheus.CassandraTasksExports; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.prometheus.client.hotspot.DefaultExports; @@ -72,6 +73,11 @@ public static void intercept(@SuperCall Callable zuper) throws Exception { // Add JVM metrics DefaultExports.initialize(); + // Add task metrics + if (!config.isExtendedDisabled()) { + new CassandraTasksExports(CassandraMetricsRegistry.Metrics, config).register(); + } + // Create /metrics handler. Note, this doesn't support larger than nThreads=1 final EventLoopGroup httpGroup = new EpollEventLoopGroup(1); @@ -81,12 +87,7 @@ public static void intercept(@SuperCall Callable zuper) throws Exception { logger.info("Metrics collector started"); - Runtime.getRuntime() - .addShutdownHook( - new Thread( - () -> { - httpGroup.shutdownGracefully(); - })); + Runtime.getRuntime().addShutdownHook(new Thread(httpGroup::shutdownGracefully)); } catch (Throwable t) { logger.error("Unable to start metrics endpoint", t); } diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java new file mode 100644 index 00000000..740ade44 --- /dev/null +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java @@ -0,0 +1,384 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package io.k8ssandra.metrics.prometheus; + +import com.codahale.metrics.MetricRegistry; +import com.datastax.mgmtapi.ShimLoader; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import io.k8ssandra.metrics.builder.CassandraMetricDefinition; +import io.k8ssandra.metrics.builder.CassandraMetricNameParser; +import io.k8ssandra.metrics.config.Configuration; +import io.prometheus.client.Collector; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.cassandra.db.compaction.OperationType; +import org.slf4j.LoggerFactory; + +/** + * Collect non-metrics information from Cassandra and turn them to metrics. This is considerably + * slower to collect than the metrics we currently ship. + */ +public class CassandraTasksExports extends Collector implements Collector.Describable { + private static final org.slf4j.Logger logger = + LoggerFactory.getLogger(CassandraTasksExports.class); + + private static final String METRICS_PREFIX = "org_apache_cassandra_metrics_extended_"; + private final MetricRegistry registry; + + private final CassandraMetricNameParser parser; + + public CassandraTasksExports(MetricRegistry registry, Configuration config) { + this.registry = registry; + parser = CassandraMetricNameParser.getDefaultParser(config); + } + + @Override + public List collect() { + + ArrayList familySamples = Lists.newArrayList(); + + // Collect Compaction Task metrics + familySamples.addAll(getCompactionStats()); + + // Collect active streaming sessions + familySamples.addAll(getStreamInfoStats()); + + // Collect other sstableOperations (if not part of Compactions metrics already) + + // Collect JobExecutor tasks + + // Collect MBean ones not exposed currently in CassandraMetrics / 3.11 + + return familySamples; + } + + @Override + public List describe() { + return new ArrayList<>(); + } + + // Exported here to allow easier testing + @VisibleForTesting + List>>> getStreamInfos() { + return ShimLoader.instance.get().getStreamInfo(); + } + + List getStreamInfoStats() { + ArrayList additionalLabels = + Lists.newArrayList("plan_id", "operation", "peer", "connection"); + + // These should be EA targets, 8 metrics to create + CassandraMetricDefinition filesToReceive = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_files_to_receive", + "", + additionalLabels, + Lists.newArrayList()); + + CassandraMetricDefinition filesReceived = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_files_received", + "", + additionalLabels, + Lists.newArrayList()); + + CassandraMetricDefinition sizeToReceive = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_size_to_receive", + "", + additionalLabels, + Lists.newArrayList()); + + CassandraMetricDefinition sizeReceived = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_size_received", + "", + additionalLabels, + Lists.newArrayList()); + + CassandraMetricDefinition filesToSend = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_files_to_send", + "", + additionalLabels, + Lists.newArrayList()); + + CassandraMetricDefinition filesSent = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_files_sent", + "", + additionalLabels, + Lists.newArrayList()); + + CassandraMetricDefinition sizeToSend = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_size_to_send", + "", + additionalLabels, + Lists.newArrayList()); + + CassandraMetricDefinition sizeSent = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_size_sent", + "", + additionalLabels, + Lists.newArrayList()); + + // This is a lot simpler code without all the casting back and forth if description was in the + // same place for + // 3.11 and 4.x. Simplify this once 3.11 support is dropped to use + // StreamManager.instance.getCurrentStreams() .. + + List>>> streamInfos = getStreamInfos(); + + List totalFilesToReceiveSamples = new ArrayList<>(); + List totalFilesReceivedSamples = new ArrayList<>(); + List totalSizeToReceiveSamples = new ArrayList<>(); + List totalSizeReceivedSamples = new ArrayList<>(); + List totalFilesToSendSamples = new ArrayList<>(); + List totalFilesSentSamples = new ArrayList<>(); + List totalSizeToSendSamples = new ArrayList<>(); + List totalSizeSentSamples = new ArrayList<>(); + + for (Map>> streamInfo : streamInfos) { + for (Map.Entry>> sessionResults : streamInfo.entrySet()) { + String planId = sessionResults.getKey(); + for (Map session : sessionResults.getValue()) { + List labelValues = + Lists.newArrayListWithCapacity(filesToReceive.getLabelValues().size() + 4); + labelValues.addAll(filesToReceive.getLabelValues()); + labelValues.add(planId); + labelValues.add(session.get("STREAM_OPERATION")); + labelValues.add(session.get("PEER")); + labelValues.add(session.get("USING_CONNECTION")); + + long totalFilesToReceive = Long.parseLong(session.get("TOTAL_FILES_TO_RECEIVE")); + long totalFilesReceived = Long.parseLong(session.get("TOTAL_FILES_RECEIVED")); + long totalSizeToReceive = Long.parseLong(session.get("TOTAL_SIZE_TO_RECEIVE")); + long totalSizeReceived = Long.parseLong(session.get("TOTAL_SIZE_RECEIVED")); + long totalFilesToSend = Long.parseLong(session.get("TOTAL_FILES_TO_SEND")); + long totalFilesSent = Long.parseLong(session.get("TOTAL_FILES_SENT")); + long totalSizeToSend = Long.parseLong(session.get("TOTAL_SIZE_TO_SEND")); + long totalSizeSent = Long.parseLong(session.get("TOTAL_SIZE_SENT")); + + // Receive samples + Collector.MetricFamilySamples.Sample totalFilesToReceiveSample = + new Collector.MetricFamilySamples.Sample( + filesToReceive.getMetricName(), + filesToReceive.getLabelNames(), + labelValues, + totalFilesToReceive); + + totalFilesToReceiveSamples.add(totalFilesToReceiveSample); + + Collector.MetricFamilySamples.Sample totalFilesReceivedSample = + new Collector.MetricFamilySamples.Sample( + filesReceived.getMetricName(), + filesReceived.getLabelNames(), + labelValues, + totalFilesReceived); + + totalFilesReceivedSamples.add(totalFilesReceivedSample); + + Collector.MetricFamilySamples.Sample totalSizeToReceiveSample = + new Collector.MetricFamilySamples.Sample( + sizeToReceive.getMetricName(), + sizeToReceive.getLabelNames(), + labelValues, + totalSizeToReceive); + + totalSizeToReceiveSamples.add(totalSizeToReceiveSample); + + Collector.MetricFamilySamples.Sample totalSizeReceivedSample = + new Collector.MetricFamilySamples.Sample( + sizeReceived.getMetricName(), + sizeReceived.getLabelNames(), + labelValues, + totalSizeReceived); + + totalSizeReceivedSamples.add(totalSizeReceivedSample); + + // Send samples + Collector.MetricFamilySamples.Sample totalFilesToSendSample = + new Collector.MetricFamilySamples.Sample( + filesToSend.getMetricName(), + filesToSend.getLabelNames(), + labelValues, + totalFilesToSend); + + totalFilesToSendSamples.add(totalFilesToSendSample); + + Collector.MetricFamilySamples.Sample totalFilesSentSample = + new Collector.MetricFamilySamples.Sample( + filesSent.getMetricName(), + filesSent.getLabelNames(), + labelValues, + totalFilesSent); + + totalFilesSentSamples.add(totalFilesSentSample); + + Collector.MetricFamilySamples.Sample totalSizeToSendSample = + new Collector.MetricFamilySamples.Sample( + sizeToSend.getMetricName(), + sizeToSend.getLabelNames(), + labelValues, + totalSizeToSend); + + totalSizeToSendSamples.add(totalSizeToSendSample); + + Collector.MetricFamilySamples.Sample totalSizeSentSample = + new Collector.MetricFamilySamples.Sample( + sizeSent.getMetricName(), sizeSent.getLabelNames(), labelValues, totalSizeSent); + + totalSizeSentSamples.add(totalSizeSentSample); + } + } + } + + // Receive + MetricFamilySamples filesToReceiveFamily = + new MetricFamilySamples( + filesToReceive.getMetricName(), Type.GAUGE, "", totalFilesToReceiveSamples); + + MetricFamilySamples filesReceivedFamily = + new MetricFamilySamples( + filesReceived.getMetricName(), Type.GAUGE, "", totalFilesReceivedSamples); + + MetricFamilySamples sizeToReceiveFamily = + new MetricFamilySamples( + sizeToReceive.getMetricName(), Type.GAUGE, "", totalSizeToReceiveSamples); + + MetricFamilySamples sizeReceivedFamily = + new MetricFamilySamples( + sizeReceived.getMetricName(), Type.GAUGE, "", totalSizeReceivedSamples); + + // Send + MetricFamilySamples filesToSendFamily = + new MetricFamilySamples( + filesToSend.getMetricName(), Type.GAUGE, "", totalFilesToSendSamples); + + MetricFamilySamples filesSentFamily = + new MetricFamilySamples(filesSent.getMetricName(), Type.GAUGE, "", totalFilesSentSamples); + + MetricFamilySamples sizeToSendFamily = + new MetricFamilySamples(sizeToSend.getMetricName(), Type.GAUGE, "", totalSizeToSendSamples); + + MetricFamilySamples sizeSentFamily = + new MetricFamilySamples(sizeSent.getMetricName(), Type.GAUGE, "", totalSizeSentSamples); + + return Lists.newArrayList( + filesToReceiveFamily, + filesReceivedFamily, + sizeToReceiveFamily, + sizeReceivedFamily, + filesToSendFamily, + filesSentFamily, + sizeToSendFamily, + sizeSentFamily); + } + + List> getCompactions() { + return ShimLoader.instance.get().getCompactionManager().getCompactions(); + } + + List getCompactionStats() { + + // Cassandra's internal CompactionMetrics are close to what we want, but not exactly. + // And we can't access CompactionManager.getMetrics() to get them in 3.11 + List> compactions = + getCompactions().stream() + .filter( + c -> { + String taskType = c.get("taskType"); + if (taskType == null) { + // DSE 6.8 renamed the column + taskType = c.get("operationType"); + } + + if (taskType == null) { + return false; + } + + try { + OperationType operationType = OperationType.valueOf(taskType.toUpperCase()); + // Ignore taskTypes: COUNTER_CACHE_SAVE, KEY_CACHE_SAVE, ROW_CACHE_SAVE (from + // Cassandra 4.1) + return operationType != OperationType.COUNTER_CACHE_SAVE + && operationType != OperationType.KEY_CACHE_SAVE + && operationType != OperationType.ROW_CACHE_SAVE; + } catch (IllegalArgumentException e) { + return false; + } + }) + .collect(Collectors.toList()); + + ArrayList additionalLabels = + Lists.newArrayList("keyspace", "table", "compaction_id", "unit", "type"); + + // These should be EA targets.. + CassandraMetricDefinition protoCompleted = + parser.parseDropwizardMetric( + METRICS_PREFIX + "compaction_stats_completed", "", additionalLabels, new ArrayList<>()); + + CassandraMetricDefinition protoTotal = + parser.parseDropwizardMetric( + METRICS_PREFIX + "compaction_stats_total", "", additionalLabels, new ArrayList<>()); + + List completedSamples = new ArrayList<>(compactions.size() * 2); + List totalSamples = new ArrayList<>(compactions.size() * 2); + for (Map c : compactions) { + List labelValues = + Lists.newArrayListWithCapacity(protoCompleted.getLabelValues().size() + 5); + labelValues.addAll(protoCompleted.getLabelValues()); + + String compactionId = c.get("compactionId"); + if (compactionId == null) { + // DSE 6.8 renamed this one also + compactionId = c.get("operationId"); + } + + String taskType = c.get("taskType"); + if (taskType == null) { + // DSE 6.8 + taskType = c.get("operationType"); + } + + labelValues.add(c.get("keyspace")); + labelValues.add(c.get("columnfamily")); + labelValues.add(compactionId); + labelValues.add(c.get("unit")); + labelValues.add(taskType); + + Collector.MetricFamilySamples.Sample completeSample = + new Collector.MetricFamilySamples.Sample( + protoCompleted.getMetricName(), + protoCompleted.getLabelNames(), + labelValues, + Double.parseDouble(c.get("completed"))); + + Collector.MetricFamilySamples.Sample totalSample = + new Collector.MetricFamilySamples.Sample( + protoTotal.getMetricName(), + protoTotal.getLabelNames(), + labelValues, + Double.parseDouble(c.get("total"))); + + completedSamples.add(completeSample); + totalSamples.add(totalSample); + } + + MetricFamilySamples completeFamily = + new MetricFamilySamples(protoCompleted.getMetricName(), Type.GAUGE, "", completedSamples); + + MetricFamilySamples totalFamily = + new MetricFamilySamples(protoTotal.getMetricName(), Type.GAUGE, "", totalSamples); + + return Lists.newArrayList(completeFamily, totalFamily); + } +} diff --git a/management-api-agent-common/src/main/resources/default-metric-settings.yaml b/management-api-agent-common/src/main/resources/default-metric-settings.yaml index c91a54ee..1890e305 100644 --- a/management-api-agent-common/src/main/resources/default-metric-settings.yaml +++ b/management-api-agent-common/src/main/resources/default-metric-settings.yaml @@ -163,3 +163,4 @@ labels: env: pod_name: "POD_NAME" node_name: "NODE_NAME" +extended_metrics_disabled: false diff --git a/management-api-agent-common/src/test/java/io/k8ssandra/metrics/config/ConfigReaderTest.java b/management-api-agent-common/src/test/java/io/k8ssandra/metrics/config/ConfigReaderTest.java index b4c03b76..3c0b9177 100644 --- a/management-api-agent-common/src/test/java/io/k8ssandra/metrics/config/ConfigReaderTest.java +++ b/management-api-agent-common/src/test/java/io/k8ssandra/metrics/config/ConfigReaderTest.java @@ -6,8 +6,10 @@ package io.k8ssandra.metrics.config; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.net.URL; import org.junit.Test; @@ -21,6 +23,7 @@ public void readEmptyConfig() { Configuration configuration = ConfigReader.readCustomConfig(); assertEquals(0, configuration.getRelabels().size()); assertNull(configuration.getEndpointConfiguration()); + assertFalse(configuration.isExtendedDisabled()); } @Test @@ -58,6 +61,8 @@ public void readFromTLSConfigFile() { assertEquals("/etc/ssl/ca.crt", tlsConfig.getCaCertPath()); assertEquals("/etc/ssl/tls.crt", tlsConfig.getTlsCertPath()); assertEquals("/etc/ssl/tls.key", tlsConfig.getTlsKeyPath()); + + assertTrue(configuration.isExtendedDisabled()); } @Test diff --git a/management-api-agent-common/src/test/java/io/k8ssandra/metrics/prometheus/TaskExportsTests.java b/management-api-agent-common/src/test/java/io/k8ssandra/metrics/prometheus/TaskExportsTests.java new file mode 100644 index 00000000..fb084455 --- /dev/null +++ b/management-api-agent-common/src/test/java/io/k8ssandra/metrics/prometheus/TaskExportsTests.java @@ -0,0 +1,111 @@ +/* + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package io.k8ssandra.metrics.prometheus; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.codahale.metrics.MetricRegistry; +import com.google.common.collect.Lists; +import io.k8ssandra.metrics.config.Configuration; +import io.prometheus.client.Collector; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.cassandra.db.compaction.OperationType; +import org.junit.Test; +import org.mockito.Mockito; + +public class TaskExportsTests { + + @Test + public void testStreamInfoStats() { + MetricRegistry mockRegistry = mock(MetricRegistry.class); + Configuration config = new Configuration(); + CassandraTasksExports exports = new CassandraTasksExports(mockRegistry, config); + CassandraTasksExports spy = Mockito.spy(exports); + Mockito.doReturn(getStreamInfoMock()).when(spy).getStreamInfos(); + when(spy.getStreamInfoStats()).thenCallRealMethod(); + + List streamInfoStats = spy.getStreamInfoStats(); + assertEquals(8, streamInfoStats.size()); + assertEquals(10, streamInfoStats.get(0).samples.size()); + for (Collector.MetricFamilySamples streamInfoStat : streamInfoStats) { + for (Collector.MetricFamilySamples.Sample sample : streamInfoStat.samples) { + assertEquals(9, sample.labelNames.size()); + assertEquals(sample.labelNames.size(), sample.labelValues.size()); + } + } + } + + @Test + public void testCompactionStats() { + MetricRegistry mockRegistry = mock(MetricRegistry.class); + Configuration config = new Configuration(); + CassandraTasksExports exports = new CassandraTasksExports(mockRegistry, config); + CassandraTasksExports spy = Mockito.spy(exports); + Mockito.doReturn(getCompactionsMock()).when(spy).getCompactions(); + when(spy.getCompactionStats()).thenCallRealMethod(); + + assertEquals(2, spy.getCompactionStats().size()); + for (Collector.MetricFamilySamples compactionStat : spy.getCompactionStats()) { + assertEquals(1, compactionStat.samples.size()); + Collector.MetricFamilySamples.Sample sample = compactionStat.samples.get(0); + assertEquals(10, sample.labelNames.size()); + assertEquals(sample.labelValues.size(), sample.labelNames.size()); + } + } + + private List> getCompactionsMock() { + ArrayList> results = Lists.newArrayList(); + + Map ret = new HashMap<>(); + ret.put("id", ""); + ret.put("keyspace", "getKeyspace()"); + ret.put("columnfamily", "getColumnFamily()"); + ret.put("completed", "1"); + ret.put("total", "2"); + ret.put("taskType", OperationType.COMPACTION.name()); + ret.put("unit", "unit.toString()"); + ret.put("compactionId", "compactionId"); + + results.add(ret); + + return results; + } + + private List>>> getStreamInfoMock() { + List>>> result = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + Map>> streamInfo = new HashMap<>(); + List> sessionResults = new ArrayList<>(); + + Map sessionInfo = new HashMap<>(); + sessionInfo.put("STREAM_OPERATION", "testStreaming"); + sessionInfo.put("PEER", "127.0.0.1"); + sessionInfo.put("USING_CONNECTION", "127.0.0.1"); + sessionInfo.put("TOTAL_FILES_TO_RECEIVE", "10"); + sessionInfo.put("TOTAL_FILES_RECEIVED", "9"); + sessionInfo.put("TOTAL_SIZE_TO_RECEIVE", "128"); + sessionInfo.put("TOTAL_SIZE_RECEIVED", "127"); + + sessionInfo.put("TOTAL_FILES_TO_SEND", "5"); + sessionInfo.put("TOTAL_FILES_SENT", "4"); + sessionInfo.put("TOTAL_SIZE_TO_SEND", "512"); + sessionInfo.put("TOTAL_SIZE_SENT", "511"); + sessionResults.add(sessionInfo); + + streamInfo.put("123456", sessionResults); + + result.add(streamInfo); + } + + return result; + } +} diff --git a/management-api-agent-common/src/test/resources/collector_tls.yaml b/management-api-agent-common/src/test/resources/collector_tls.yaml index 941d6f34..0225f07d 100644 --- a/management-api-agent-common/src/test/resources/collector_tls.yaml +++ b/management-api-agent-common/src/test/resources/collector_tls.yaml @@ -13,3 +13,4 @@ relabels: separator: "," regex: "^(a|b|c),.*" action: "drop" +extended_metrics_disabled: true \ No newline at end of file