diff --git a/client/java/src/main/java/com/linecorp/centraldogma/internal/client/AbstractWatcher.java b/client/java/src/main/java/com/linecorp/centraldogma/internal/client/AbstractWatcher.java index 8f83f83f1e..9b66707439 100644 --- a/client/java/src/main/java/com/linecorp/centraldogma/internal/client/AbstractWatcher.java +++ b/client/java/src/main/java/com/linecorp/centraldogma/internal/client/AbstractWatcher.java @@ -30,14 +30,13 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.AtomicDouble; import com.linecorp.centraldogma.client.CentralDogma; import com.linecorp.centraldogma.client.Latest; @@ -49,7 +48,7 @@ import com.linecorp.centraldogma.common.Revision; import com.linecorp.centraldogma.common.ShuttingDownException; -import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Gauge; abstract class AbstractWatcher implements Watcher { @@ -115,7 +114,7 @@ private enum State { private final List> updateListeners; private final AtomicReference state; private final CompletableFuture> initialValueFuture; - private final AtomicLong revisionGauge; + private final AtomicDouble latestNotifiedRevision = new AtomicDouble(); private volatile Latest latest; private volatile ScheduledFuture currentScheduleFuture; @@ -129,11 +128,12 @@ protected AbstractWatcher(CentralDogma client, ScheduledExecutorService executor this.repositoryName = requireNonNull(repositoryName, "repositoryName"); this.pathPattern = requireNonNull(pathPattern, "pathPattern"); - final Iterable tags = ImmutableList.of( - Tag.of("project", projectName), Tag.of("repository", repositoryName), - Tag.of("path", pathPattern)); - revisionGauge = client.meterRegistry().gauge("centraldogma.client.watcher.revision", - tags, new AtomicLong()); + Gauge.builder("centraldogma.client.watcher.revision", + this, watcher -> watcher.latestNotifiedRevision.get()) + .tag("project", projectName) + .tag("repository", repositoryName) + .tag("path", pathPattern) + .register(client.meterRegistry()); updateListeners = new CopyOnWriteArrayList<>(); state = new AtomicReference<>(State.INIT); @@ -247,7 +247,6 @@ private void doWatch(int numAttemptsSoFar) { if (oldLatest == null) { initialValueFuture.complete(newLatest); } - revisionGauge.set(latest.revision().major()); } // Watch again for the next change. @@ -300,14 +299,19 @@ private void notifyListeners() { } final Latest latest = this.latest; + boolean notifyFailed = false; for (BiConsumer listener : updateListeners) { try { listener.accept(latest.revision(), latest.value()); } catch (Exception e) { logger.warn("Exception thrown for watcher ({}/{}{}): rev={}", projectName, repositoryName, pathPattern, latest.revision(), e); + notifyFailed = true; } } + if (!notifyFailed) { + latestNotifiedRevision.set(latest.revision().major()); + } } private void handleEventLoopShutdown(RejectedExecutionException e) { diff --git a/it/src/test/java/com/linecorp/centraldogma/it/WatchTest.java b/it/src/test/java/com/linecorp/centraldogma/it/WatchTest.java index ab37483fa0..53a2c626f4 100644 --- a/it/src/test/java/com/linecorp/centraldogma/it/WatchTest.java +++ b/it/src/test/java/com/linecorp/centraldogma/it/WatchTest.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -37,6 +38,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.TextNode; +import com.linecorp.armeria.common.metric.MoreMeters; import com.linecorp.centraldogma.client.CentralDogma; import com.linecorp.centraldogma.client.Latest; import com.linecorp.centraldogma.client.Watcher; @@ -46,6 +48,8 @@ import com.linecorp.centraldogma.common.Query; import com.linecorp.centraldogma.common.Revision; +import io.micrometer.core.instrument.MeterRegistry; + class WatchTest { @RegisterExtension @@ -356,6 +360,49 @@ void transformingWatcher(ClientType clientType) throws InterruptedException { assertThat(heavyWatcher.latest().revision()).isEqualTo(rev3); } + @ParameterizedTest + @EnumSource(ClientType.class) + void watchFileMetrics(ClientType clientType) throws Exception { + revertTestFiles(clientType); + + final CentralDogma client = clientType.client(dogma); + final MeterRegistry registry = client.meterRegistry(); + + final String filePath = "/test/test2.json"; + final Watcher jsonWatcher = client.fileWatcher(dogma.project(), dogma.repo1(), + Query.ofJson(filePath)); + + // wait for initial value + final Revision rev0 = jsonWatcher.initialValueFuture().join().revision(); + await().untilAsserted(() -> assertThat(jsonWatcher.latestValue().at("/a").asText()) + .isEqualTo("apple")); + final double initialWatcherRev = getWatcherRevisionMetric(registry, dogma.project(), + dogma.repo1(), filePath); + + // update the json + final Change update = Change.ofJsonUpsert(filePath, "{ \"a\": \"air\" }"); + final PushResult res1 = client.push(dogma.project(), dogma.repo1(), rev0, "Modify /a", update).join(); + + // the notify complete revision is incremented + await().untilAsserted(() -> assertThat(jsonWatcher.latestValue().at("/a").asText()) + .isEqualTo("air")); + assertThat(getWatcherRevisionMetric(registry, dogma.project(), dogma.repo1(), filePath)) + .isEqualTo(initialWatcherRev + 1); + + jsonWatcher.watch(node -> { + throw new IllegalArgumentException(); + }); + + final Change update2 = Change.ofJsonUpsert(filePath, "{ \"a\": \"ant\" }"); + client.push(dogma.project(), dogma.repo1(), res1.revision(), "Modify /a", update2).join(); + + // the notify complete revision isn't incremented + await().untilAsserted(() -> assertThat(jsonWatcher.latestValue().at("/a").asText()) + .isEqualTo("ant")); + assertThat(getWatcherRevisionMetric(registry, dogma.project(), dogma.repo1(), filePath)) + .isEqualTo(initialWatcherRev + 1); + } + private static void revertTestFiles(ClientType clientType) { final Change change1 = Change.ofJsonUpsert("/test/test1.json", "[ 1, 2, 3 ]"); final Change change2 = Change.ofJsonUpsert("/test/test2.json", "{ \"a\": \"apple\" }"); @@ -369,4 +416,14 @@ private static void revertTestFiles(ClientType clientType) { "Revert test files", changes).join(); } } + + private static Double getWatcherRevisionMetric(MeterRegistry registry, String project, + String repo, String path) { + final String name = "centraldogma.client.watcher.revision#value{path=" + path + + ",project=" + project + ",repository=" + repo + '}'; + return MoreMeters.measureAll(registry).entrySet().stream() + .filter(e -> e.getKey().equals(name)) + .map(Map.Entry::getValue).findFirst() + .get(); + } } diff --git a/testing/common/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaRuleDelegate.java b/testing/common/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaRuleDelegate.java index 96bb1f85cf..019b345087 100644 --- a/testing/common/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaRuleDelegate.java +++ b/testing/common/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaRuleDelegate.java @@ -39,6 +39,7 @@ import com.linecorp.centraldogma.server.MirroringService; import com.linecorp.centraldogma.server.TlsConfig; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.netty.handler.ssl.util.SelfSignedCertificate; import io.netty.util.NetUtil; @@ -262,6 +263,8 @@ private void configureClientCommon(AbstractArmeriaCentralDogmaBuilder builder assert serverAddress != null; builder.host(serverAddress.getHostString(), serverAddress.getPort()); + builder.meterRegistry(new SimpleMeterRegistry()); + if (useTls) { builder.useTls(); builder.clientFactory(ClientFactory.insecure());