Skip to content

Commit

Permalink
Fix labelValues adder (defaultLabelValuse were overwritten), add test…
Browse files Browse the repository at this point in the history
…s to verify the methods do not error and have correct labelValues and labelNames sizes in the end results also.
  • Loading branch information
burmanm committed Oct 4, 2023
1 parent bcd0014 commit c558045
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 17 deletions.
6 changes: 6 additions & 0 deletions management-api-agent-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.5.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.codahale.metrics.MetricRegistry;
import com.datastax.mgmtapi.ShimLoader;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import io.k8ssandra.metrics.builder.CassandraMetricDefinition;
import io.k8ssandra.metrics.builder.CassandraMetricNameParser;
Expand Down Expand Up @@ -62,6 +63,12 @@ public List<MetricFamilySamples> describe() {
return new ArrayList<>();
}

// Exported here to allow easier testing
@VisibleForTesting
List<Map<String, List<Map<String, String>>>> getStreamInfos() {
return ShimLoader.instance.get().getStreamInfo();
}

List<MetricFamilySamples> getStreamInfoStats() {
ArrayList<String> additionalLabels =
Lists.newArrayList("plan_id", "operation", "peer", "connection");
Expand Down Expand Up @@ -128,8 +135,7 @@ List<MetricFamilySamples> getStreamInfoStats() {
// 3.11 and 4.x. Simplify this once 3.11 support is dropped to use
// StreamManager.instance.getCurrentStreams() ..

List<Map<String, List<Map<String, String>>>> streamInfos =
ShimLoader.instance.get().getStreamInfo();
List<Map<String, List<Map<String, String>>>> streamInfos = getStreamInfos();

List<MetricFamilySamples.Sample> totalFilesToReceiveSamples = new ArrayList<>();
List<MetricFamilySamples.Sample> totalFilesReceivedSamples = new ArrayList<>();
Expand All @@ -144,12 +150,13 @@ List<MetricFamilySamples> getStreamInfoStats() {
for (Map.Entry<String, List<Map<String, String>>> sessionResults : streamInfo.entrySet()) {
String planId = sessionResults.getKey();
for (Map<String, String> session : sessionResults.getValue()) {
ArrayList<String> labelValues =
Lists.newArrayList(
planId,
session.get("STREAM_OPERATION"),
session.get("PEER"),
session.get("USING_CONNECTION"));
List<String> labelValues =
Lists.newArrayListWithCapacity(filesToReceive.getLabelValues().size() + 4);
labelValues.addAll(filesToReceive.getLabelValues());
labelValues.add(planId);
labelValues.add(session.get("STREAM_OPERATION"));
labelValues.add(session.get("PEER"));
labelValues.add(session.get("USING_CONNECTION"));

long totalFilesToReceive = Long.parseLong(session.get("TOTAL_FILES_TO_RECEIVE"));
long totalFilesReceived = Long.parseLong(session.get("TOTAL_FILES_RECEIVED"));
Expand Down Expand Up @@ -276,17 +283,23 @@ List<MetricFamilySamples> getStreamInfoStats() {
sizeSentFamily);
}

List<Map<String, String>> getCompactions() {
return ShimLoader.instance.get().getCompactionManager().getCompactions();
}

List<MetricFamilySamples> getCompactionStats() {

// Cassandra's internal CompactionMetrics are close to what we want, but not exactly.
// And we can't access CompactionManager.getMetrics() to get them in 3.11
List<Map<String, String>> compactions =
ShimLoader.instance.get().getCompactionManager().getCompactions().stream()
getCompactions().stream()
.filter(
c -> {
String taskType = c.get("taskType");
try {
OperationType operationType = OperationType.valueOf(taskType.toUpperCase());
// Ignore taskTypes: COUNTER_CACHE_SAVE, KEY_CACHE_SAVE, ROW_CACHE_SAVE (from
// Cassandra 4.1)
return operationType != OperationType.COUNTER_CACHE_SAVE
&& operationType != OperationType.KEY_CACHE_SAVE
&& operationType != OperationType.ROW_CACHE_SAVE;
Expand All @@ -296,7 +309,6 @@ List<MetricFamilySamples> getCompactionStats() {
})
.collect(Collectors.toList());

// Ignore taskTypes: COUNTER_CACHE_SAVE, KEY_CACHE_SAVE, ROW_CACHE_SAVE (from Cassandra 4.1)
ArrayList<String> additionalLabels =
Lists.newArrayList("keyspace", "table", "compaction_id", "unit", "type");

Expand All @@ -312,13 +324,14 @@ List<MetricFamilySamples> getCompactionStats() {
List<MetricFamilySamples.Sample> completedSamples = new ArrayList<>(compactions.size() * 2);
List<MetricFamilySamples.Sample> totalSamples = new ArrayList<>(compactions.size() * 2);
for (Map<String, String> c : compactions) {
ArrayList<String> labelValues =
Lists.newArrayList(
c.get("keyspace"),
c.get("columnfamily"),
c.get("compactionId"),
c.get("unit"),
c.get("taskType"));
List<String> labelValues =
Lists.newArrayListWithCapacity(protoCompleted.getLabelValues().size() + 5);
labelValues.addAll(protoCompleted.getLabelValues());
labelValues.add(c.get("keyspace"));
labelValues.add(c.get("columnfamily"));
labelValues.add(c.get("compactionId"));
labelValues.add(c.get("unit"));
labelValues.add(c.get("taskType"));

Collector.MetricFamilySamples.Sample completeSample =
new Collector.MetricFamilySamples.Sample(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package io.k8ssandra.metrics.prometheus;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Lists;
import io.k8ssandra.metrics.config.Configuration;
import io.prometheus.client.Collector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.cassandra.db.compaction.OperationType;
import org.junit.Test;
import org.mockito.Mockito;

public class TaskExportsTests {

@Test
public void testStreamInfoStats() {
MetricRegistry mockRegistry = mock(MetricRegistry.class);
Configuration config = new Configuration();
CassandraTasksExports exports = new CassandraTasksExports(mockRegistry, config);
CassandraTasksExports spy = Mockito.spy(exports);
Mockito.doReturn(getStreamInfoMock()).when(spy).getStreamInfos();
when(spy.getStreamInfoStats()).thenCallRealMethod();

List<Collector.MetricFamilySamples> streamInfoStats = spy.getStreamInfoStats();
assertEquals(8, streamInfoStats.size());
assertEquals(10, streamInfoStats.get(0).samples.size());
for (Collector.MetricFamilySamples streamInfoStat : streamInfoStats) {
for (Collector.MetricFamilySamples.Sample sample : streamInfoStat.samples) {
assertEquals(9, sample.labelNames.size());
assertEquals(sample.labelNames.size(), sample.labelValues.size());
}
}
}

@Test
public void testCompactionStats() {
MetricRegistry mockRegistry = mock(MetricRegistry.class);
Configuration config = new Configuration();
CassandraTasksExports exports = new CassandraTasksExports(mockRegistry, config);
CassandraTasksExports spy = Mockito.spy(exports);
Mockito.doReturn(getCompactionsMock()).when(spy).getCompactions();
when(spy.getCompactionStats()).thenCallRealMethod();

assertEquals(2, spy.getCompactionStats().size());
for (Collector.MetricFamilySamples compactionStat : spy.getCompactionStats()) {
assertEquals(1, compactionStat.samples.size());
Collector.MetricFamilySamples.Sample sample = compactionStat.samples.get(0);
assertEquals(10, sample.labelNames.size());
assertEquals(sample.labelValues.size(), sample.labelNames.size());
}
}

private List<Map<String, String>> getCompactionsMock() {
ArrayList<Map<String, String>> results = Lists.newArrayList();

Map<String, String> ret = new HashMap<>();
ret.put("id", "");
ret.put("keyspace", "getKeyspace()");
ret.put("columnfamily", "getColumnFamily()");
ret.put("completed", "1");
ret.put("total", "2");
ret.put("taskType", OperationType.COMPACTION.name());
ret.put("unit", "unit.toString()");
ret.put("compactionId", "compactionId");

results.add(ret);

return results;
}

private List<Map<String, List<Map<String, String>>>> getStreamInfoMock() {
List<Map<String, List<Map<String, String>>>> result = new ArrayList<>();

for (int i = 0; i < 10; i++) {
Map<String, List<Map<String, String>>> streamInfo = new HashMap<>();
List<Map<String, String>> sessionResults = new ArrayList<>();

Map<String, String> sessionInfo = new HashMap<>();
sessionInfo.put("STREAM_OPERATION", "testStreaming");
sessionInfo.put("PEER", "127.0.0.1");
sessionInfo.put("USING_CONNECTION", "127.0.0.1");
sessionInfo.put("TOTAL_FILES_TO_RECEIVE", "10");
sessionInfo.put("TOTAL_FILES_RECEIVED", "9");
sessionInfo.put("TOTAL_SIZE_TO_RECEIVE", "128");
sessionInfo.put("TOTAL_SIZE_RECEIVED", "127");

sessionInfo.put("TOTAL_FILES_TO_SEND", "5");
sessionInfo.put("TOTAL_FILES_SENT", "4");
sessionInfo.put("TOTAL_SIZE_TO_SEND", "512");
sessionInfo.put("TOTAL_SIZE_SENT", "511");
sessionResults.add(sessionInfo);

streamInfo.put("123456", sessionResults);

result.add(streamInfo);
}

return result;
}
}

0 comments on commit c558045

Please sign in to comment.