diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java index 9d1869c3..c4f35938 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/interceptors/MetricsInterceptor.java @@ -74,7 +74,7 @@ public static void intercept(@SuperCall Callable zuper) throws Exception { DefaultExports.initialize(); // Add task metrics - if(!config.isExtendedDisabled()) { + if (!config.isExtendedDisabled()) { new CassandraTasksExports(CassandraMetricsRegistry.Metrics, config).register(); } diff --git a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java index 71628bd1..bf871ced 100644 --- a/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java +++ b/management-api-agent-common/src/main/java/io/k8ssandra/metrics/prometheus/CassandraTasksExports.java @@ -27,6 +27,7 @@ public class CassandraTasksExports extends Collector implements Collector.Descri private static final org.slf4j.Logger logger = LoggerFactory.getLogger(CassandraTasksExports.class); + private static final String METRICS_PREFIX = "org_apache_cassandra_metrics_extended_"; private final MetricRegistry registry; private final CassandraMetricNameParser parser; @@ -45,6 +46,7 @@ public List collect() { familySamples.addAll(getCompactionStats()); // Collect active streaming sessions + familySamples.addAll(getStreamInfoStats()); // Collect other sstableOperations (if not part of Compactions metrics already) @@ -60,6 +62,196 @@ public List describe() { return new ArrayList<>(); } + List getStreamInfoStats() { + ArrayList additionalLabels = + Lists.newArrayList("plan_id", "operation", "peer", "connection"); + + // These should be EA targets, 8 metrics to create + CassandraMetricDefinition filesToReceive = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_files_to_receive", "", additionalLabels, null); + + CassandraMetricDefinition filesReceived = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_files_received", "", additionalLabels, null); + + CassandraMetricDefinition sizeToReceive = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_size_to_receive", "", additionalLabels, null); + + CassandraMetricDefinition sizeReceived = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_size_received", "", additionalLabels, null); + + CassandraMetricDefinition filesToSend = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_files_to_send", "", additionalLabels, null); + + CassandraMetricDefinition filesSent = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_files_sent", "", additionalLabels, null); + + CassandraMetricDefinition sizeToSend = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_size_to_send", "", additionalLabels, null); + + CassandraMetricDefinition sizeSent = + parser.parseDropwizardMetric( + METRICS_PREFIX + "streaming_total_size_sent", "", additionalLabels, null); + + // This is a lot simpler code without all the casting back and forth if description was in the + // same place for + // 3.11 and 4.x. Simplify this once 3.11 support is dropped to use + // StreamManager.instance.getCurrentStreams() .. + + List>>> streamInfos = + ShimLoader.instance.get().getStreamInfo(); + + List totalFilesToReceiveSamples = new ArrayList<>(); + List totalFilesReceivedSamples = new ArrayList<>(); + List totalSizeToReceiveSamples = new ArrayList<>(); + List totalSizeReceivedSamples = new ArrayList<>(); + List totalFilesToSendSamples = new ArrayList<>(); + List totalFilesSentSamples = new ArrayList<>(); + List totalSizeToSendSamples = new ArrayList<>(); + List totalSizeSentSamples = new ArrayList<>(); + + for (Map>> streamInfo : streamInfos) { + for (Map.Entry>> sessionResults : streamInfo.entrySet()) { + String planId = sessionResults.getKey(); + for (Map session : sessionResults.getValue()) { + ArrayList labelValues = + Lists.newArrayList( + planId, + session.get("STREAM_OPERATION"), + session.get("PEER"), + session.get("USING_CONNECTION")); + + long totalFilesToReceive = Long.parseLong(session.get("TOTAL_FILES_TO_RECEIVE")); + long totalFilesReceived = Long.parseLong(session.get("TOTAL_FILES_RECEIVED")); + long totalSizeToReceive = Long.parseLong(session.get("TOTAL_SIZE_TO_RECEIVE")); + long totalSizeReceived = Long.parseLong(session.get("TOTAL_SIZE_RECEIVED")); + long totalFilesToSend = Long.parseLong(session.get("TOTAL_FILES_TO_SEND")); + long totalFilesSent = Long.parseLong(session.get("TOTAL_FILES_SENT")); + long totalSizeToSend = Long.parseLong(session.get("TOTAL_SIZE_TO_SEND")); + long totalSizeSent = Long.parseLong(session.get("TOTAL_SIZE_SENT")); + + // Receive samples + Collector.MetricFamilySamples.Sample totalFilesToReceiveSample = + new Collector.MetricFamilySamples.Sample( + filesToReceive.getMetricName(), + filesToReceive.getLabelNames(), + labelValues, + totalFilesToReceive); + + totalFilesToReceiveSamples.add(totalFilesToReceiveSample); + + Collector.MetricFamilySamples.Sample totalFilesReceivedSample = + new Collector.MetricFamilySamples.Sample( + filesReceived.getMetricName(), + filesReceived.getLabelNames(), + labelValues, + totalFilesReceived); + + totalFilesReceivedSamples.add(totalFilesReceivedSample); + + Collector.MetricFamilySamples.Sample totalSizeToReceiveSample = + new Collector.MetricFamilySamples.Sample( + sizeToReceive.getMetricName(), + sizeToReceive.getLabelNames(), + labelValues, + totalSizeToReceive); + + totalSizeToReceiveSamples.add(totalSizeToReceiveSample); + + Collector.MetricFamilySamples.Sample totalSizeReceivedSample = + new Collector.MetricFamilySamples.Sample( + sizeReceived.getMetricName(), + sizeReceived.getLabelNames(), + labelValues, + totalSizeReceived); + + totalSizeReceivedSamples.add(totalSizeReceivedSample); + + // Send samples + Collector.MetricFamilySamples.Sample totalFilesToSendSample = + new Collector.MetricFamilySamples.Sample( + filesToSend.getMetricName(), + filesToSend.getLabelNames(), + labelValues, + totalFilesToSend); + + totalFilesToSendSamples.add(totalFilesToSendSample); + + Collector.MetricFamilySamples.Sample totalFilesSentSample = + new Collector.MetricFamilySamples.Sample( + filesSent.getMetricName(), + filesSent.getLabelNames(), + labelValues, + totalFilesSent); + + totalFilesSentSamples.add(totalFilesSentSample); + + Collector.MetricFamilySamples.Sample totalSizeToSendSample = + new Collector.MetricFamilySamples.Sample( + sizeToSend.getMetricName(), + sizeToSend.getLabelNames(), + labelValues, + totalSizeToSend); + + totalSizeToSendSamples.add(totalSizeToSendSample); + + Collector.MetricFamilySamples.Sample totalSizeSentSample = + new Collector.MetricFamilySamples.Sample( + sizeSent.getMetricName(), sizeSent.getLabelNames(), labelValues, totalSizeSent); + + totalSizeSentSamples.add(totalSizeSentSample); + } + } + } + + // Receive + MetricFamilySamples filesToReceiveFamily = + new MetricFamilySamples( + filesToReceive.getMetricName(), Type.GAUGE, "", totalFilesToReceiveSamples); + + MetricFamilySamples filesReceivedFamily = + new MetricFamilySamples( + filesReceived.getMetricName(), Type.GAUGE, "", totalFilesReceivedSamples); + + MetricFamilySamples sizeToReceiveFamily = + new MetricFamilySamples( + sizeToReceive.getMetricName(), Type.GAUGE, "", totalSizeToReceiveSamples); + + MetricFamilySamples sizeReceivedFamily = + new MetricFamilySamples( + sizeReceived.getMetricName(), Type.GAUGE, "", totalSizeReceivedSamples); + + // Send + MetricFamilySamples filesToSendFamily = + new MetricFamilySamples( + filesToSend.getMetricName(), Type.GAUGE, "", totalFilesToSendSamples); + + MetricFamilySamples filesSentFamily = + new MetricFamilySamples(filesSent.getMetricName(), Type.GAUGE, "", totalFilesSentSamples); + + MetricFamilySamples sizeToSendFamily = + new MetricFamilySamples(sizeToSend.getMetricName(), Type.GAUGE, "", totalSizeToSendSamples); + + MetricFamilySamples sizeSentFamily = + new MetricFamilySamples(sizeSent.getMetricName(), Type.GAUGE, "", totalSizeSentSamples); + + return Lists.newArrayList( + filesToReceiveFamily, + filesReceivedFamily, + sizeToReceiveFamily, + sizeReceivedFamily, + filesToSendFamily, + filesSentFamily, + sizeToSendFamily, + sizeSentFamily); + } + List getCompactionStats() { // Cassandra's internal CompactionMetrics are close to what we want, but not exactly. @@ -80,20 +272,14 @@ List getCompactionStats() { ArrayList additionalLabels = Lists.newArrayList("keyspace", "table", "compaction_id", "unit", "type"); - // These should be escape handled + // These should be EA targets.. CassandraMetricDefinition protoCompleted = parser.parseDropwizardMetric( - "org_apache_cassandra_metrics_extended_compaction_stats_completed", - "", - additionalLabels, - null); + METRICS_PREFIX + "compaction_stats_completed", "", additionalLabels, null); CassandraMetricDefinition protoTotal = parser.parseDropwizardMetric( - "org_apache_cassandra_metrics_extended_compaction_stats_total", - "", - additionalLabels, - null); + METRICS_PREFIX + "compaction_stats_total", "", additionalLabels, null); List completedSamples = new ArrayList<>(compactions.size() * 2); List totalSamples = new ArrayList<>(compactions.size() * 2);