From c55804528aece0cf9db21079f6abe949f30c79d4 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Wed, 4 Oct 2023 16:34:33 +0300 Subject: [PATCH] Fix labelValues adder (defaultLabelValuse were overwritten), add tests to verify the methods do not error and have correct labelValues and labelNames sizes in the end results also. --- management-api-agent-common/pom.xml | 6 + .../prometheus/CassandraTasksExports.java | 47 +++++--- .../metrics/prometheus/TaskExportsTests.java | 106 ++++++++++++++++++ 3 files changed, 142 insertions(+), 17 deletions(-) create mode 100644 management-api-agent-common/src/test/java/io/k8ssandra/metrics/prometheus/TaskExportsTests.java diff --git a/management-api-agent-common/pom.xml b/management-api-agent-common/pom.xml index 385da8ab..c82f03c2 100644 --- a/management-api-agent-common/pom.xml +++ b/management-api-agent-common/pom.xml @@ -60,6 +60,12 @@ ${junit.version} test + + org.mockito + mockito-core + 5.5.0 + test + io.netty netty-all diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java index 04fefe3d..51bd2a97 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java @@ -7,6 +7,7 @@ import com.codahale.metrics.MetricRegistry; import com.datastax.mgmtapi.ShimLoader; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import io.k8ssandra.metrics.builder.CassandraMetricDefinition; import io.k8ssandra.metrics.builder.CassandraMetricNameParser; @@ -62,6 +63,12 @@ public List describe() { return new ArrayList<>(); } + // Exported here to allow easier testing + @VisibleForTesting + List>>> getStreamInfos() { + return ShimLoader.instance.get().getStreamInfo(); + } + List getStreamInfoStats() { ArrayList additionalLabels = Lists.newArrayList("plan_id", "operation", "peer", "connection"); @@ -128,8 +135,7 @@ List getStreamInfoStats() { // 3.11 and 4.x. Simplify this once 3.11 support is dropped to use // StreamManager.instance.getCurrentStreams() .. - List>>> streamInfos = - ShimLoader.instance.get().getStreamInfo(); + List>>> streamInfos = getStreamInfos(); List totalFilesToReceiveSamples = new ArrayList<>(); List totalFilesReceivedSamples = new ArrayList<>(); @@ -144,12 +150,13 @@ List getStreamInfoStats() { for (Map.Entry>> sessionResults : streamInfo.entrySet()) { String planId = sessionResults.getKey(); for (Map session : sessionResults.getValue()) { - ArrayList labelValues = - Lists.newArrayList( - planId, - session.get("STREAM_OPERATION"), - session.get("PEER"), - session.get("USING_CONNECTION")); + List labelValues = + Lists.newArrayListWithCapacity(filesToReceive.getLabelValues().size() + 4); + labelValues.addAll(filesToReceive.getLabelValues()); + labelValues.add(planId); + labelValues.add(session.get("STREAM_OPERATION")); + labelValues.add(session.get("PEER")); + labelValues.add(session.get("USING_CONNECTION")); long totalFilesToReceive = Long.parseLong(session.get("TOTAL_FILES_TO_RECEIVE")); long totalFilesReceived = Long.parseLong(session.get("TOTAL_FILES_RECEIVED")); @@ -276,17 +283,23 @@ List getStreamInfoStats() { sizeSentFamily); } + List> getCompactions() { + return ShimLoader.instance.get().getCompactionManager().getCompactions(); + } + List getCompactionStats() { // Cassandra's internal CompactionMetrics are close to what we want, but not exactly. // And we can't access CompactionManager.getMetrics() to get them in 3.11 List> compactions = - ShimLoader.instance.get().getCompactionManager().getCompactions().stream() + getCompactions().stream() .filter( c -> { String taskType = c.get("taskType"); try { OperationType operationType = OperationType.valueOf(taskType.toUpperCase()); + // Ignore taskTypes: COUNTER_CACHE_SAVE, KEY_CACHE_SAVE, ROW_CACHE_SAVE (from + // Cassandra 4.1) return operationType != OperationType.COUNTER_CACHE_SAVE && operationType != OperationType.KEY_CACHE_SAVE && operationType != OperationType.ROW_CACHE_SAVE; @@ -296,7 +309,6 @@ List getCompactionStats() { }) .collect(Collectors.toList()); - // Ignore taskTypes: COUNTER_CACHE_SAVE, KEY_CACHE_SAVE, ROW_CACHE_SAVE (from Cassandra 4.1) ArrayList additionalLabels = Lists.newArrayList("keyspace", "table", "compaction_id", "unit", "type"); @@ -312,13 +324,14 @@ List getCompactionStats() { List completedSamples = new ArrayList<>(compactions.size() * 2); List totalSamples = new ArrayList<>(compactions.size() * 2); for (Map c : compactions) { - ArrayList labelValues = - Lists.newArrayList( - c.get("keyspace"), - c.get("columnfamily"), - c.get("compactionId"), - c.get("unit"), - c.get("taskType")); + List labelValues = + Lists.newArrayListWithCapacity(protoCompleted.getLabelValues().size() + 5); + labelValues.addAll(protoCompleted.getLabelValues()); + labelValues.add(c.get("keyspace")); + labelValues.add(c.get("columnfamily")); + labelValues.add(c.get("compactionId")); + labelValues.add(c.get("unit")); + labelValues.add(c.get("taskType")); Collector.MetricFamilySamples.Sample completeSample = new Collector.MetricFamilySamples.Sample( diff --git a/management-api-agent-common/src/test/java/io/k8ssandra/metrics/prometheus/TaskExportsTests.java b/management-api-agent-common/src/test/java/io/k8ssandra/metrics/prometheus/TaskExportsTests.java new file mode 100644 index 00000000..62ed0f10 --- /dev/null +++ b/management-api-agent-common/src/test/java/io/k8ssandra/metrics/prometheus/TaskExportsTests.java @@ -0,0 +1,106 @@ +package io.k8ssandra.metrics.prometheus; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.codahale.metrics.MetricRegistry; +import com.google.common.collect.Lists; +import io.k8ssandra.metrics.config.Configuration; +import io.prometheus.client.Collector; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.cassandra.db.compaction.OperationType; +import org.junit.Test; +import org.mockito.Mockito; + +public class TaskExportsTests { + + @Test + public void testStreamInfoStats() { + MetricRegistry mockRegistry = mock(MetricRegistry.class); + Configuration config = new Configuration(); + CassandraTasksExports exports = new CassandraTasksExports(mockRegistry, config); + CassandraTasksExports spy = Mockito.spy(exports); + Mockito.doReturn(getStreamInfoMock()).when(spy).getStreamInfos(); + when(spy.getStreamInfoStats()).thenCallRealMethod(); + + List streamInfoStats = spy.getStreamInfoStats(); + assertEquals(8, streamInfoStats.size()); + assertEquals(10, streamInfoStats.get(0).samples.size()); + for (Collector.MetricFamilySamples streamInfoStat : streamInfoStats) { + for (Collector.MetricFamilySamples.Sample sample : streamInfoStat.samples) { + assertEquals(9, sample.labelNames.size()); + assertEquals(sample.labelNames.size(), sample.labelValues.size()); + } + } + } + + @Test + public void testCompactionStats() { + MetricRegistry mockRegistry = mock(MetricRegistry.class); + Configuration config = new Configuration(); + CassandraTasksExports exports = new CassandraTasksExports(mockRegistry, config); + CassandraTasksExports spy = Mockito.spy(exports); + Mockito.doReturn(getCompactionsMock()).when(spy).getCompactions(); + when(spy.getCompactionStats()).thenCallRealMethod(); + + assertEquals(2, spy.getCompactionStats().size()); + for (Collector.MetricFamilySamples compactionStat : spy.getCompactionStats()) { + assertEquals(1, compactionStat.samples.size()); + Collector.MetricFamilySamples.Sample sample = compactionStat.samples.get(0); + assertEquals(10, sample.labelNames.size()); + assertEquals(sample.labelValues.size(), sample.labelNames.size()); + } + } + + private List> getCompactionsMock() { + ArrayList> results = Lists.newArrayList(); + + Map ret = new HashMap<>(); + ret.put("id", ""); + ret.put("keyspace", "getKeyspace()"); + ret.put("columnfamily", "getColumnFamily()"); + ret.put("completed", "1"); + ret.put("total", "2"); + ret.put("taskType", OperationType.COMPACTION.name()); + ret.put("unit", "unit.toString()"); + ret.put("compactionId", "compactionId"); + + results.add(ret); + + return results; + } + + private List>>> getStreamInfoMock() { + List>>> result = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + Map>> streamInfo = new HashMap<>(); + List> sessionResults = new ArrayList<>(); + + Map sessionInfo = new HashMap<>(); + sessionInfo.put("STREAM_OPERATION", "testStreaming"); + sessionInfo.put("PEER", "127.0.0.1"); + sessionInfo.put("USING_CONNECTION", "127.0.0.1"); + sessionInfo.put("TOTAL_FILES_TO_RECEIVE", "10"); + sessionInfo.put("TOTAL_FILES_RECEIVED", "9"); + sessionInfo.put("TOTAL_SIZE_TO_RECEIVE", "128"); + sessionInfo.put("TOTAL_SIZE_RECEIVED", "127"); + + sessionInfo.put("TOTAL_FILES_TO_SEND", "5"); + sessionInfo.put("TOTAL_FILES_SENT", "4"); + sessionInfo.put("TOTAL_SIZE_TO_SEND", "512"); + sessionInfo.put("TOTAL_SIZE_SENT", "511"); + sessionResults.add(sessionInfo); + + streamInfo.put("123456", sessionResults); + + result.add(streamInfo); + } + + return result; + } +}