Skip to content

Commit

Permalink
Updates how counters are incremented and kept track of in StreamingTr…
Browse files Browse the repository at this point in the history
…ipUpdateMetrics.

Also adds more details to the new producerMetrics flag, and renames it from detailedMetrics.
  • Loading branch information
eibakke committed Nov 5, 2024
1 parent 31e3bbd commit b6ac221
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public static SiriETUpdaterParameters create(String configRef, NodeAdapter c) {
.asBoolean(false),
HttpHeadersConfig.headers(c, V2_3),
c
.of("detailedMetrics")
.of("producerMetrics")
.since(V2_0)
.summary("If detailed metrics should be collected.")
.summary("If failure, success, and warning metrics should be collected per producer.")
.asBoolean(false)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public record SiriETUpdaterParameters(
Duration previewInterval,
boolean fuzzyTripMatching,
HttpHeaders httpRequestHeaders,
boolean detailedMetrics
boolean producerMetrics
)
implements
PollingGraphUpdaterParameters, UrlUpdaterParameters, SiriETHttpTripUpdateSource.Parameters {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public record SiriETGooglePubsubUpdaterParameters(
Duration reconnectPeriod,
Duration initialGetDataTimeout,
boolean fuzzyTripMatching,
boolean detailedMetrics
boolean producerMetrics
)
implements UrlUpdaterParameters {
public static Duration RECONNECT_PERIOD = Duration.ofSeconds(30);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ public interface UrlUpdaterParameters {
String configRef();
String feedId();

default boolean detailedMetrics() {
default boolean producerMetrics() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
import org.opentripplanner.updater.spi.UpdateError;
import org.opentripplanner.updater.spi.UpdateResult;
import org.opentripplanner.updater.spi.UpdateSuccess;
Expand All @@ -27,123 +23,58 @@
public class StreamingTripUpdateMetrics extends TripUpdateMetrics {

protected static final String METRICS_PREFIX = "streaming_trip_updates";
private final Counter successfulCounter;
private final Counter failureCounter;
private final Counter warningsCounter;
private final Map<UpdateError.UpdateErrorType, Counter> failuresByType = new HashMap<>();
private final Map<UpdateSuccess.WarningType, Counter> warningsByType = new HashMap<>();

private final boolean detailedMetrics;
private final Map<String, Counter> successesByProvider = new HashMap<>();
private final Map<Pair<String, UpdateError.UpdateErrorType>, Counter> failuresByTypeAndProvider = new HashMap<>();
private final boolean producerMetrics;

public StreamingTripUpdateMetrics(UrlUpdaterParameters parameters) {
super(parameters);
this.successfulCounter = getCounter("successful", "Total successfully applied trip updates");
this.failureCounter = getCounter("failed", "Total failed trip updates");
this.warningsCounter = getCounter("warnings", "Total warnings for successful trip updates");
this.detailedMetrics = parameters.detailedMetrics();
this.producerMetrics = parameters.producerMetrics();
}

public void setCounters(UpdateResult result) {
this.successfulCounter.increment(result.successful());
this.failureCounter.increment(result.failed());
this.warningsCounter.increment(result.warnings().size());

setFailures(result);
setWarnings(result);
if (detailedMetrics) {
setFailuresByTypeAndProvider(result);
setSuccessesByProvider(result);
}
incrementWarningCounts(result);
incrementFailureCounts(result);
incrementSuccessCounts(result);
}

private void setWarnings(UpdateResult result) {
private void incrementWarningCounts(UpdateResult result) {
for (var warningType : result.warnings()) {
var counter = warningsByType.get(warningType);
if (Objects.isNull(counter)) {
counter =
getCounter(
"warning_type",
"Total warnings by type generated by successful trip updates",
Tag.of("warningType", warningType.name())
);
warningsByType.put(warningType, counter);
}
counter.increment();
}
}

private void setFailures(UpdateResult result) {
for (var errorType : result.failures().keySet()) {
var counter = failuresByType.get(errorType);
if (Objects.isNull(counter)) {
counter =
getCounter(
"failure_type",
"Total failed trip updates by type",
Tag.of("errorType", errorType.name())
);
failuresByType.put(errorType, counter);
}
counter.increment(result.failures().get(errorType).size());
Tags tags = Tags.concat(baseTags, Tags.of("warningType", warningType.name()));
Counter
.builder(METRICS_PREFIX + "." + "warnings")
.description("Total warnings by type generated by successful trip updates")
.tags(tags)
.register(Metrics.globalRegistry)
.increment();
}
}

private void setFailuresByTypeAndProvider(UpdateResult result) {
Map<Pair<String, UpdateError.UpdateErrorType>, Long> failureCountByTypeAndProvider = result
.errors()
.stream()
.collect(
Collectors.groupingBy(
error -> Pair.of(error.producer(), error.errorType()),
Collectors.counting()
)
);

for (var entry : failureCountByTypeAndProvider.entrySet()) {
Counter counter = failuresByTypeAndProvider.get(entry.getKey());
if (Objects.isNull(counter)) {
counter =
getCounter(
"failures_by_type_and_provider",
"Total failed trip updates by type and provider",
Tag.of("provider", entry.getKey().getLeft()),
Tag.of("errorType", entry.getKey().getRight().name())
);
failuresByTypeAndProvider.put(entry.getKey(), counter);
private void incrementFailureCounts(UpdateResult result) {
for (UpdateError error : result.errors()) {
Tags tags = Tags.concat(baseTags, Tags.of("errorType", error.errorType().name()));
if (producerMetrics) {
tags = tags.and(Tag.of("producer", error.producer()));
}
counter.increment(entry.getValue());
Counter
.builder(METRICS_PREFIX + "." + "failed")
.description("Total failed trip updates")
.tags(tags)
.register(Metrics.globalRegistry)
.increment();
}
}

private void setSuccessesByProvider(UpdateResult result) {
Map<String, Long> successCountByProvider = result
.successes()
.stream()
.collect(Collectors.groupingBy(UpdateSuccess::producer, Collectors.counting()));

for (var entry : successCountByProvider.entrySet()) {
Counter counter = successesByProvider.get(entry.getKey());
if (Objects.isNull(counter)) {
counter =
getCounter(
"successes_by_provider",
"Total successful trip updates by producer",
Tag.of("producer", entry.getKey())
);
successesByProvider.put(entry.getKey(), counter);
private void incrementSuccessCounts(UpdateResult result) {
for (UpdateSuccess success : result.successes()) {
Tags tags = Tags.of(baseTags);
if (producerMetrics) {
tags = tags.and(Tag.of("producer", success.producer()));
}
counter.increment(entry.getValue());
Counter
.builder(METRICS_PREFIX + "." + "successful")
.description("Total successfully applied trip updates")
.tags(tags)
.register(Metrics.globalRegistry)
.increment();
}
}

private Counter getCounter(String name, String description, Tag... tags) {
var finalTags = Tags.concat(Arrays.stream(tags).toList(), baseTags);
return Counter
.builder(METRICS_PREFIX + "." + name)
.description(description)
.tags(finalTags)
.register(Metrics.globalRegistry);
}
}
2 changes: 1 addition & 1 deletion doc/user/sandbox/siri/SiriUpdater.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ To enable the SIRI updater you need to add it to the updaters section of the `ro
|--------------------------------|:---------------:|--------------------------------------------------------------------------------------------------------|:----------:|---------------|:-----:|
| type = "siri-et-updater" | `enum` | The type of the updater. | *Required* | | 1.5 |
| blockReadinessUntilInitialized | `boolean` | Whether catching up with the updates should block the readiness check from returning a 'ready' result. | *Optional* | `false` | 2.0 |
| detailedMetrics | `boolean` | If detailed metrics should be collected. | *Optional* | `false` | 2.0 |
| feedId | `string` | The ID of the feed to apply the updates to. | *Required* | | 2.0 |
| frequency | `duration` | How often the updates should be retrieved. | *Optional* | `"PT1M"` | 2.0 |
| fuzzyTripMatching | `boolean` | If the fuzzy trip matcher should be used to match trips. | *Optional* | `false` | 2.0 |
| previewInterval | `duration` | TODO | *Optional* | | 2.0 |
| producerMetrics | `boolean` | If failure, success, and warning metrics should be collected per producer. | *Optional* | `false` | 2.0 |
| requestorRef | `string` | The requester reference. | *Optional* | | 2.0 |
| timeout | `duration` | The HTTP timeout to download the updates. | *Optional* | `"PT15S"` | 2.0 |
| [url](#u__8__url) | `string` | The URL to send the HTTP requests to. | *Required* | | 2.0 |
Expand Down

0 comments on commit b6ac221

Please sign in to comment.