Skip to content

Commit

Permalink
Add StreamInfo metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
burmanm committed Aug 31, 2023
1 parent 781b5fa commit 7da5c26
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static void intercept(@SuperCall Callable<Void> zuper) throws Exception {
DefaultExports.initialize();

// Add task metrics
if(!config.isExtendedDisabled()) {
if (!config.isExtendedDisabled()) {
new CassandraTasksExports(CassandraMetricsRegistry.Metrics, config).register();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class CassandraTasksExports extends Collector implements Collector.Descri
private static final org.slf4j.Logger logger =
LoggerFactory.getLogger(CassandraTasksExports.class);

private static final String METRICS_PREFIX = "org_apache_cassandra_metrics_extended_";
private final MetricRegistry registry;

private final CassandraMetricNameParser parser;
Expand All @@ -45,6 +46,7 @@ public List<MetricFamilySamples> collect() {
familySamples.addAll(getCompactionStats());

// Collect active streaming sessions
familySamples.addAll(getStreamInfoStats());

// Collect other sstableOperations (if not part of Compactions metrics already)

Expand All @@ -60,6 +62,196 @@ public List<MetricFamilySamples> describe() {
return new ArrayList<>();
}

List<MetricFamilySamples> getStreamInfoStats() {
ArrayList<String> additionalLabels =
Lists.newArrayList("plan_id", "operation", "peer", "connection");

// These should be EA targets, 8 metrics to create
CassandraMetricDefinition filesToReceive =
parser.parseDropwizardMetric(
METRICS_PREFIX + "streaming_total_files_to_receive", "", additionalLabels, null);

CassandraMetricDefinition filesReceived =
parser.parseDropwizardMetric(
METRICS_PREFIX + "streaming_total_files_received", "", additionalLabels, null);

CassandraMetricDefinition sizeToReceive =
parser.parseDropwizardMetric(
METRICS_PREFIX + "streaming_total_size_to_receive", "", additionalLabels, null);

CassandraMetricDefinition sizeReceived =
parser.parseDropwizardMetric(
METRICS_PREFIX + "streaming_total_size_received", "", additionalLabels, null);

CassandraMetricDefinition filesToSend =
parser.parseDropwizardMetric(
METRICS_PREFIX + "streaming_total_files_to_send", "", additionalLabels, null);

CassandraMetricDefinition filesSent =
parser.parseDropwizardMetric(
METRICS_PREFIX + "streaming_total_files_sent", "", additionalLabels, null);

CassandraMetricDefinition sizeToSend =
parser.parseDropwizardMetric(
METRICS_PREFIX + "streaming_total_size_to_send", "", additionalLabels, null);

CassandraMetricDefinition sizeSent =
parser.parseDropwizardMetric(
METRICS_PREFIX + "streaming_total_size_sent", "", additionalLabels, null);

// This is a lot simpler code without all the casting back and forth if description was in the
// same place for
// 3.11 and 4.x. Simplify this once 3.11 support is dropped to use
// StreamManager.instance.getCurrentStreams() ..

List<Map<String, List<Map<String, String>>>> streamInfos =
ShimLoader.instance.get().getStreamInfo();

List<MetricFamilySamples.Sample> totalFilesToReceiveSamples = new ArrayList<>();
List<MetricFamilySamples.Sample> totalFilesReceivedSamples = new ArrayList<>();
List<MetricFamilySamples.Sample> totalSizeToReceiveSamples = new ArrayList<>();
List<MetricFamilySamples.Sample> totalSizeReceivedSamples = new ArrayList<>();
List<MetricFamilySamples.Sample> totalFilesToSendSamples = new ArrayList<>();
List<MetricFamilySamples.Sample> totalFilesSentSamples = new ArrayList<>();
List<MetricFamilySamples.Sample> totalSizeToSendSamples = new ArrayList<>();
List<MetricFamilySamples.Sample> totalSizeSentSamples = new ArrayList<>();

for (Map<String, List<Map<String, String>>> streamInfo : streamInfos) {
for (Map.Entry<String, List<Map<String, String>>> sessionResults : streamInfo.entrySet()) {
String planId = sessionResults.getKey();
for (Map<String, String> session : sessionResults.getValue()) {
ArrayList<String> labelValues =
Lists.newArrayList(
planId,
session.get("STREAM_OPERATION"),
session.get("PEER"),
session.get("USING_CONNECTION"));

long totalFilesToReceive = Long.parseLong(session.get("TOTAL_FILES_TO_RECEIVE"));
long totalFilesReceived = Long.parseLong(session.get("TOTAL_FILES_RECEIVED"));
long totalSizeToReceive = Long.parseLong(session.get("TOTAL_SIZE_TO_RECEIVE"));
long totalSizeReceived = Long.parseLong(session.get("TOTAL_SIZE_RECEIVED"));
long totalFilesToSend = Long.parseLong(session.get("TOTAL_FILES_TO_SEND"));
long totalFilesSent = Long.parseLong(session.get("TOTAL_FILES_SENT"));
long totalSizeToSend = Long.parseLong(session.get("TOTAL_SIZE_TO_SEND"));
long totalSizeSent = Long.parseLong(session.get("TOTAL_SIZE_SENT"));

// Receive samples
Collector.MetricFamilySamples.Sample totalFilesToReceiveSample =
new Collector.MetricFamilySamples.Sample(
filesToReceive.getMetricName(),
filesToReceive.getLabelNames(),
labelValues,
totalFilesToReceive);

totalFilesToReceiveSamples.add(totalFilesToReceiveSample);

Collector.MetricFamilySamples.Sample totalFilesReceivedSample =
new Collector.MetricFamilySamples.Sample(
filesReceived.getMetricName(),
filesReceived.getLabelNames(),
labelValues,
totalFilesReceived);

totalFilesReceivedSamples.add(totalFilesReceivedSample);

Collector.MetricFamilySamples.Sample totalSizeToReceiveSample =
new Collector.MetricFamilySamples.Sample(
sizeToReceive.getMetricName(),
sizeToReceive.getLabelNames(),
labelValues,
totalSizeToReceive);

totalSizeToReceiveSamples.add(totalSizeToReceiveSample);

Collector.MetricFamilySamples.Sample totalSizeReceivedSample =
new Collector.MetricFamilySamples.Sample(
sizeReceived.getMetricName(),
sizeReceived.getLabelNames(),
labelValues,
totalSizeReceived);

totalSizeReceivedSamples.add(totalSizeReceivedSample);

// Send samples
Collector.MetricFamilySamples.Sample totalFilesToSendSample =
new Collector.MetricFamilySamples.Sample(
filesToSend.getMetricName(),
filesToSend.getLabelNames(),
labelValues,
totalFilesToSend);

totalFilesToSendSamples.add(totalFilesToSendSample);

Collector.MetricFamilySamples.Sample totalFilesSentSample =
new Collector.MetricFamilySamples.Sample(
filesSent.getMetricName(),
filesSent.getLabelNames(),
labelValues,
totalFilesSent);

totalFilesSentSamples.add(totalFilesSentSample);

Collector.MetricFamilySamples.Sample totalSizeToSendSample =
new Collector.MetricFamilySamples.Sample(
sizeToSend.getMetricName(),
sizeToSend.getLabelNames(),
labelValues,
totalSizeToSend);

totalSizeToSendSamples.add(totalSizeToSendSample);

Collector.MetricFamilySamples.Sample totalSizeSentSample =
new Collector.MetricFamilySamples.Sample(
sizeSent.getMetricName(), sizeSent.getLabelNames(), labelValues, totalSizeSent);

totalSizeSentSamples.add(totalSizeSentSample);
}
}
}

// Receive
MetricFamilySamples filesToReceiveFamily =
new MetricFamilySamples(
filesToReceive.getMetricName(), Type.GAUGE, "", totalFilesToReceiveSamples);

MetricFamilySamples filesReceivedFamily =
new MetricFamilySamples(
filesReceived.getMetricName(), Type.GAUGE, "", totalFilesReceivedSamples);

MetricFamilySamples sizeToReceiveFamily =
new MetricFamilySamples(
sizeToReceive.getMetricName(), Type.GAUGE, "", totalSizeToReceiveSamples);

MetricFamilySamples sizeReceivedFamily =
new MetricFamilySamples(
sizeReceived.getMetricName(), Type.GAUGE, "", totalSizeReceivedSamples);

// Send
MetricFamilySamples filesToSendFamily =
new MetricFamilySamples(
filesToSend.getMetricName(), Type.GAUGE, "", totalFilesToSendSamples);

MetricFamilySamples filesSentFamily =
new MetricFamilySamples(filesSent.getMetricName(), Type.GAUGE, "", totalFilesSentSamples);

MetricFamilySamples sizeToSendFamily =
new MetricFamilySamples(sizeToSend.getMetricName(), Type.GAUGE, "", totalSizeToSendSamples);

MetricFamilySamples sizeSentFamily =
new MetricFamilySamples(sizeSent.getMetricName(), Type.GAUGE, "", totalSizeSentSamples);

return Lists.newArrayList(
filesToReceiveFamily,
filesReceivedFamily,
sizeToReceiveFamily,
sizeReceivedFamily,
filesToSendFamily,
filesSentFamily,
sizeToSendFamily,
sizeSentFamily);
}

List<MetricFamilySamples> getCompactionStats() {

// Cassandra's internal CompactionMetrics are close to what we want, but not exactly.
Expand All @@ -80,20 +272,14 @@ List<MetricFamilySamples> getCompactionStats() {
ArrayList<String> additionalLabels =
Lists.newArrayList("keyspace", "table", "compaction_id", "unit", "type");

// These should be escape handled
// These should be EA targets..
CassandraMetricDefinition protoCompleted =
parser.parseDropwizardMetric(
"org_apache_cassandra_metrics_extended_compaction_stats_completed",
"",
additionalLabels,
null);
METRICS_PREFIX + "compaction_stats_completed", "", additionalLabels, null);

CassandraMetricDefinition protoTotal =
parser.parseDropwizardMetric(
"org_apache_cassandra_metrics_extended_compaction_stats_total",
"",
additionalLabels,
null);
METRICS_PREFIX + "compaction_stats_total", "", additionalLabels, null);

List<MetricFamilySamples.Sample> completedSamples = new ArrayList<>(compactions.size() * 2);
List<MetricFamilySamples.Sample> totalSamples = new ArrayList<>(compactions.size() * 2);
Expand Down

0 comments on commit 7da5c26

Please sign in to comment.