Skip to content

Commit

Permalink
add it test, update revision when notify succeeds
Browse files Browse the repository at this point in the history
  • Loading branch information
jrhee17 committed Apr 16, 2021
1 parent 94d6a17 commit e32a68e
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AtomicDouble;

import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.client.Latest;
Expand All @@ -49,7 +48,7 @@
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.common.ShuttingDownException;

import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Gauge;

abstract class AbstractWatcher<T> implements Watcher<T> {

Expand Down Expand Up @@ -115,7 +114,7 @@ private enum State {
private final List<BiConsumer<? super Revision, ? super T>> updateListeners;
private final AtomicReference<State> state;
private final CompletableFuture<Latest<T>> initialValueFuture;
private final AtomicLong revisionGauge;
private final AtomicDouble latestNotifiedRevision = new AtomicDouble();

private volatile Latest<T> latest;
private volatile ScheduledFuture<?> currentScheduleFuture;
Expand All @@ -129,11 +128,12 @@ protected AbstractWatcher(CentralDogma client, ScheduledExecutorService executor
this.repositoryName = requireNonNull(repositoryName, "repositoryName");
this.pathPattern = requireNonNull(pathPattern, "pathPattern");

final Iterable<Tag> tags = ImmutableList.of(
Tag.of("project", projectName), Tag.of("repository", repositoryName),
Tag.of("path", pathPattern));
revisionGauge = client.meterRegistry().gauge("centraldogma.client.watcher.revision",
tags, new AtomicLong());
Gauge.builder("centraldogma.client.watcher.revision",
this, watcher -> watcher.latestNotifiedRevision.get())
.tag("project", projectName)
.tag("repository", repositoryName)
.tag("path", pathPattern)
.register(client.meterRegistry());

updateListeners = new CopyOnWriteArrayList<>();
state = new AtomicReference<>(State.INIT);
Expand Down Expand Up @@ -247,7 +247,6 @@ private void doWatch(int numAttemptsSoFar) {
if (oldLatest == null) {
initialValueFuture.complete(newLatest);
}
revisionGauge.set(latest.revision().major());
}

// Watch again for the next change.
Expand Down Expand Up @@ -300,14 +299,19 @@ private void notifyListeners() {
}

final Latest<T> latest = this.latest;
boolean notifyFailed = false;
for (BiConsumer<? super Revision, ? super T> listener : updateListeners) {
try {
listener.accept(latest.revision(), latest.value());
} catch (Exception e) {
logger.warn("Exception thrown for watcher ({}/{}{}): rev={}",
projectName, repositoryName, pathPattern, latest.revision(), e);
notifyFailed = true;
}
}
if (!notifyFailed) {
latestNotifiedRevision.set(latest.revision().major());
}
}

private void handleEventLoopShutdown(RejectedExecutionException e) {
Expand Down
57 changes: 57 additions & 0 deletions it/src/test/java/com/linecorp/centraldogma/it/WatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -37,6 +38,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.TextNode;

import com.linecorp.armeria.common.metric.MoreMeters;
import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.client.Latest;
import com.linecorp.centraldogma.client.Watcher;
Expand All @@ -46,6 +48,8 @@
import com.linecorp.centraldogma.common.Query;
import com.linecorp.centraldogma.common.Revision;

import io.micrometer.core.instrument.MeterRegistry;

class WatchTest {

@RegisterExtension
Expand Down Expand Up @@ -356,6 +360,49 @@ void transformingWatcher(ClientType clientType) throws InterruptedException {
assertThat(heavyWatcher.latest().revision()).isEqualTo(rev3);
}

@ParameterizedTest
@EnumSource(ClientType.class)
void watchFileMetrics(ClientType clientType) throws Exception {
revertTestFiles(clientType);

final CentralDogma client = clientType.client(dogma);
final MeterRegistry registry = client.meterRegistry();

final String filePath = "/test/test2.json";
final Watcher<JsonNode> jsonWatcher = client.fileWatcher(dogma.project(), dogma.repo1(),
Query.ofJson(filePath));

// wait for initial value
final Revision rev0 = jsonWatcher.initialValueFuture().join().revision();
await().untilAsserted(() -> assertThat(jsonWatcher.latestValue().at("/a").asText())
.isEqualTo("apple"));
final double initialWatcherRev = getWatcherRevisionMetric(registry, dogma.project(),
dogma.repo1(), filePath);

// update the json
final Change<JsonNode> update = Change.ofJsonUpsert(filePath, "{ \"a\": \"air\" }");
final PushResult res1 = client.push(dogma.project(), dogma.repo1(), rev0, "Modify /a", update).join();

// the notify complete revision is incremented
await().untilAsserted(() -> assertThat(jsonWatcher.latestValue().at("/a").asText())
.isEqualTo("air"));
assertThat(getWatcherRevisionMetric(registry, dogma.project(), dogma.repo1(), filePath))
.isEqualTo(initialWatcherRev + 1);

jsonWatcher.watch(node -> {
throw new IllegalArgumentException();
});

final Change<JsonNode> update2 = Change.ofJsonUpsert(filePath, "{ \"a\": \"ant\" }");
client.push(dogma.project(), dogma.repo1(), res1.revision(), "Modify /a", update2).join();

// the notify complete revision isn't incremented
await().untilAsserted(() -> assertThat(jsonWatcher.latestValue().at("/a").asText())
.isEqualTo("ant"));
assertThat(getWatcherRevisionMetric(registry, dogma.project(), dogma.repo1(), filePath))
.isEqualTo(initialWatcherRev + 1);
}

private static void revertTestFiles(ClientType clientType) {
final Change<JsonNode> change1 = Change.ofJsonUpsert("/test/test1.json", "[ 1, 2, 3 ]");
final Change<JsonNode> change2 = Change.ofJsonUpsert("/test/test2.json", "{ \"a\": \"apple\" }");
Expand All @@ -369,4 +416,14 @@ private static void revertTestFiles(ClientType clientType) {
"Revert test files", changes).join();
}
}

private static Double getWatcherRevisionMetric(MeterRegistry registry, String project,
String repo, String path) {
final String name = "centraldogma.client.watcher.revision#value{path=" + path +
",project=" + project + ",repository=" + repo + '}';
return MoreMeters.measureAll(registry).entrySet().stream()
.filter(e -> e.getKey().equals(name))
.map(Map.Entry::getValue).findFirst()
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.linecorp.centraldogma.server.MirroringService;
import com.linecorp.centraldogma.server.TlsConfig;

import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.NetUtil;

Expand Down Expand Up @@ -262,6 +263,8 @@ private void configureClientCommon(AbstractArmeriaCentralDogmaBuilder<?> builder
assert serverAddress != null;
builder.host(serverAddress.getHostString(), serverAddress.getPort());

builder.meterRegistry(new SimpleMeterRegistry());

if (useTls) {
builder.useTls();
builder.clientFactory(ClientFactory.insecure());
Expand Down

0 comments on commit e32a68e

Please sign in to comment.