From b6ac221f7a60f073b656ba8299a50fd8f36c9aa7 Mon Sep 17 00:00:00 2001 From: eibakke Date: Tue, 5 Nov 2024 15:03:26 +0100 Subject: [PATCH] Updates how counters are incremented and kept track of in StreamingTripUpdateMetrics. Also adds more details to the new producerMetrics flag, and renames it from detailedMetrics. --- .../updaters/SiriETUpdaterConfig.java | 4 +- .../siri/updater/SiriETUpdaterParameters.java | 2 +- .../SiriETGooglePubsubUpdaterParameters.java | 2 +- .../updater/trip/UrlUpdaterParameters.java | 2 +- .../metrics/StreamingTripUpdateMetrics.java | 141 +++++------------- doc/user/sandbox/siri/SiriUpdater.md | 2 +- 6 files changed, 42 insertions(+), 111 deletions(-) diff --git a/application/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/SiriETUpdaterConfig.java b/application/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/SiriETUpdaterConfig.java index e5107607758..cb38bf0f4b7 100644 --- a/application/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/SiriETUpdaterConfig.java +++ b/application/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/SiriETUpdaterConfig.java @@ -45,9 +45,9 @@ public static SiriETUpdaterParameters create(String configRef, NodeAdapter c) { .asBoolean(false), HttpHeadersConfig.headers(c, V2_3), c - .of("detailedMetrics") + .of("producerMetrics") .since(V2_0) - .summary("If detailed metrics should be collected.") + .summary("If failure, success, and warning metrics should be collected per producer.") .asBoolean(false) ); } diff --git a/application/src/main/java/org/opentripplanner/updater/siri/updater/SiriETUpdaterParameters.java b/application/src/main/java/org/opentripplanner/updater/siri/updater/SiriETUpdaterParameters.java index d2d42e50374..dc479c034e1 100644 --- a/application/src/main/java/org/opentripplanner/updater/siri/updater/SiriETUpdaterParameters.java +++ b/application/src/main/java/org/opentripplanner/updater/siri/updater/SiriETUpdaterParameters.java @@ -16,7 +16,7 @@ public record SiriETUpdaterParameters( Duration previewInterval, boolean fuzzyTripMatching, HttpHeaders httpRequestHeaders, - boolean detailedMetrics + boolean producerMetrics ) implements PollingGraphUpdaterParameters, UrlUpdaterParameters, SiriETHttpTripUpdateSource.Parameters { diff --git a/application/src/main/java/org/opentripplanner/updater/siri/updater/google/SiriETGooglePubsubUpdaterParameters.java b/application/src/main/java/org/opentripplanner/updater/siri/updater/google/SiriETGooglePubsubUpdaterParameters.java index 6f68c8002b4..d7fb064966a 100644 --- a/application/src/main/java/org/opentripplanner/updater/siri/updater/google/SiriETGooglePubsubUpdaterParameters.java +++ b/application/src/main/java/org/opentripplanner/updater/siri/updater/google/SiriETGooglePubsubUpdaterParameters.java @@ -16,7 +16,7 @@ public record SiriETGooglePubsubUpdaterParameters( Duration reconnectPeriod, Duration initialGetDataTimeout, boolean fuzzyTripMatching, - boolean detailedMetrics + boolean producerMetrics ) implements UrlUpdaterParameters { public static Duration RECONNECT_PERIOD = Duration.ofSeconds(30); diff --git a/application/src/main/java/org/opentripplanner/updater/trip/UrlUpdaterParameters.java b/application/src/main/java/org/opentripplanner/updater/trip/UrlUpdaterParameters.java index 2df600bfa6e..4918be84e45 100644 --- a/application/src/main/java/org/opentripplanner/updater/trip/UrlUpdaterParameters.java +++ b/application/src/main/java/org/opentripplanner/updater/trip/UrlUpdaterParameters.java @@ -5,7 +5,7 @@ public interface UrlUpdaterParameters { String configRef(); String feedId(); - default boolean detailedMetrics() { + default boolean producerMetrics() { return false; } } diff --git a/application/src/main/java/org/opentripplanner/updater/trip/metrics/StreamingTripUpdateMetrics.java b/application/src/main/java/org/opentripplanner/updater/trip/metrics/StreamingTripUpdateMetrics.java index 39c639f491a..233099e19f0 100644 --- a/application/src/main/java/org/opentripplanner/updater/trip/metrics/StreamingTripUpdateMetrics.java +++ b/application/src/main/java/org/opentripplanner/updater/trip/metrics/StreamingTripUpdateMetrics.java @@ -5,11 +5,7 @@ import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; +import java.util.List; import org.opentripplanner.updater.spi.UpdateError; import org.opentripplanner.updater.spi.UpdateResult; import org.opentripplanner.updater.spi.UpdateSuccess; @@ -27,123 +23,58 @@ public class StreamingTripUpdateMetrics extends TripUpdateMetrics { protected static final String METRICS_PREFIX = "streaming_trip_updates"; - private final Counter successfulCounter; - private final Counter failureCounter; - private final Counter warningsCounter; - private final Map failuresByType = new HashMap<>(); - private final Map warningsByType = new HashMap<>(); - - private final boolean detailedMetrics; - private final Map successesByProvider = new HashMap<>(); - private final Map, Counter> failuresByTypeAndProvider = new HashMap<>(); + private final boolean producerMetrics; public StreamingTripUpdateMetrics(UrlUpdaterParameters parameters) { super(parameters); - this.successfulCounter = getCounter("successful", "Total successfully applied trip updates"); - this.failureCounter = getCounter("failed", "Total failed trip updates"); - this.warningsCounter = getCounter("warnings", "Total warnings for successful trip updates"); - this.detailedMetrics = parameters.detailedMetrics(); + this.producerMetrics = parameters.producerMetrics(); } public void setCounters(UpdateResult result) { - this.successfulCounter.increment(result.successful()); - this.failureCounter.increment(result.failed()); - this.warningsCounter.increment(result.warnings().size()); - - setFailures(result); - setWarnings(result); - if (detailedMetrics) { - setFailuresByTypeAndProvider(result); - setSuccessesByProvider(result); - } + incrementWarningCounts(result); + incrementFailureCounts(result); + incrementSuccessCounts(result); } - private void setWarnings(UpdateResult result) { + private void incrementWarningCounts(UpdateResult result) { for (var warningType : result.warnings()) { - var counter = warningsByType.get(warningType); - if (Objects.isNull(counter)) { - counter = - getCounter( - "warning_type", - "Total warnings by type generated by successful trip updates", - Tag.of("warningType", warningType.name()) - ); - warningsByType.put(warningType, counter); - } - counter.increment(); - } - } - - private void setFailures(UpdateResult result) { - for (var errorType : result.failures().keySet()) { - var counter = failuresByType.get(errorType); - if (Objects.isNull(counter)) { - counter = - getCounter( - "failure_type", - "Total failed trip updates by type", - Tag.of("errorType", errorType.name()) - ); - failuresByType.put(errorType, counter); - } - counter.increment(result.failures().get(errorType).size()); + Tags tags = Tags.concat(baseTags, Tags.of("warningType", warningType.name())); + Counter + .builder(METRICS_PREFIX + "." + "warnings") + .description("Total warnings by type generated by successful trip updates") + .tags(tags) + .register(Metrics.globalRegistry) + .increment(); } } - private void setFailuresByTypeAndProvider(UpdateResult result) { - Map, Long> failureCountByTypeAndProvider = result - .errors() - .stream() - .collect( - Collectors.groupingBy( - error -> Pair.of(error.producer(), error.errorType()), - Collectors.counting() - ) - ); - - for (var entry : failureCountByTypeAndProvider.entrySet()) { - Counter counter = failuresByTypeAndProvider.get(entry.getKey()); - if (Objects.isNull(counter)) { - counter = - getCounter( - "failures_by_type_and_provider", - "Total failed trip updates by type and provider", - Tag.of("provider", entry.getKey().getLeft()), - Tag.of("errorType", entry.getKey().getRight().name()) - ); - failuresByTypeAndProvider.put(entry.getKey(), counter); + private void incrementFailureCounts(UpdateResult result) { + for (UpdateError error : result.errors()) { + Tags tags = Tags.concat(baseTags, Tags.of("errorType", error.errorType().name())); + if (producerMetrics) { + tags = tags.and(Tag.of("producer", error.producer())); } - counter.increment(entry.getValue()); + Counter + .builder(METRICS_PREFIX + "." + "failed") + .description("Total failed trip updates") + .tags(tags) + .register(Metrics.globalRegistry) + .increment(); } } - private void setSuccessesByProvider(UpdateResult result) { - Map successCountByProvider = result - .successes() - .stream() - .collect(Collectors.groupingBy(UpdateSuccess::producer, Collectors.counting())); - - for (var entry : successCountByProvider.entrySet()) { - Counter counter = successesByProvider.get(entry.getKey()); - if (Objects.isNull(counter)) { - counter = - getCounter( - "successes_by_provider", - "Total successful trip updates by producer", - Tag.of("producer", entry.getKey()) - ); - successesByProvider.put(entry.getKey(), counter); + private void incrementSuccessCounts(UpdateResult result) { + for (UpdateSuccess success : result.successes()) { + Tags tags = Tags.of(baseTags); + if (producerMetrics) { + tags = tags.and(Tag.of("producer", success.producer())); } - counter.increment(entry.getValue()); + Counter + .builder(METRICS_PREFIX + "." + "successful") + .description("Total successfully applied trip updates") + .tags(tags) + .register(Metrics.globalRegistry) + .increment(); } } - - private Counter getCounter(String name, String description, Tag... tags) { - var finalTags = Tags.concat(Arrays.stream(tags).toList(), baseTags); - return Counter - .builder(METRICS_PREFIX + "." + name) - .description(description) - .tags(finalTags) - .register(Metrics.globalRegistry); - } } diff --git a/doc/user/sandbox/siri/SiriUpdater.md b/doc/user/sandbox/siri/SiriUpdater.md index f63316974f4..5666f41885c 100644 --- a/doc/user/sandbox/siri/SiriUpdater.md +++ b/doc/user/sandbox/siri/SiriUpdater.md @@ -31,11 +31,11 @@ To enable the SIRI updater you need to add it to the updaters section of the `ro |--------------------------------|:---------------:|--------------------------------------------------------------------------------------------------------|:----------:|---------------|:-----:| | type = "siri-et-updater" | `enum` | The type of the updater. | *Required* | | 1.5 | | blockReadinessUntilInitialized | `boolean` | Whether catching up with the updates should block the readiness check from returning a 'ready' result. | *Optional* | `false` | 2.0 | -| detailedMetrics | `boolean` | If detailed metrics should be collected. | *Optional* | `false` | 2.0 | | feedId | `string` | The ID of the feed to apply the updates to. | *Required* | | 2.0 | | frequency | `duration` | How often the updates should be retrieved. | *Optional* | `"PT1M"` | 2.0 | | fuzzyTripMatching | `boolean` | If the fuzzy trip matcher should be used to match trips. | *Optional* | `false` | 2.0 | | previewInterval | `duration` | TODO | *Optional* | | 2.0 | +| producerMetrics | `boolean` | If failure, success, and warning metrics should be collected per producer. | *Optional* | `false` | 2.0 | | requestorRef | `string` | The requester reference. | *Optional* | | 2.0 | | timeout | `duration` | The HTTP timeout to download the updates. | *Optional* | `"PT15S"` | 2.0 | | [url](#u__8__url) | `string` | The URL to send the HTTP requests to. | *Required* | | 2.0 |