Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce client-side micrometer, start with exposing watcher revision metrics #542

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,15 @@
import com.linecorp.centraldogma.internal.thrift.WatchFileResult;
import com.linecorp.centraldogma.internal.thrift.WatchRepositoryResult;

import io.micrometer.core.instrument.MeterRegistry;

final class LegacyCentralDogma extends AbstractCentralDogma {

private final CentralDogmaService.AsyncIface client;

LegacyCentralDogma(ScheduledExecutorService blockingTaskExecutor, CentralDogmaService.AsyncIface client) {
super(blockingTaskExecutor);
LegacyCentralDogma(ScheduledExecutorService blockingTaskExecutor, CentralDogmaService.AsyncIface client,
MeterRegistry meterRegistry) {
super(blockingTaskExecutor, meterRegistry);
this.client = requireNonNull(client, "client");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import com.linecorp.centraldogma.internal.client.ReplicationLagTolerantCentralDogma;
import com.linecorp.centraldogma.internal.thrift.CentralDogmaService.AsyncIface;

import io.micrometer.core.instrument.MeterRegistry;

/**
* Builds a legacy {@link CentralDogma} client based on Thrift.
*
Expand Down Expand Up @@ -67,8 +69,10 @@ public CentralDogma build() throws UnknownHostException {
final ScheduledExecutorService blockingTaskExecutor = blockingTaskExecutor();

final int maxRetriesOnReplicationLag = maxNumRetriesOnReplicationLag();
final MeterRegistry meterRegistry = metricsEnabled() ? clientFactory().meterRegistry() : null;
final CentralDogma dogma = new LegacyCentralDogma(blockingTaskExecutor,
builder.build(AsyncIface.class));
builder.build(AsyncIface.class),
meterRegistry);
if (maxRetriesOnReplicationLag <= 0) {
return dogma;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.collect.ImmutableSet;

import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.metric.NoopMeterRegistry;
import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.client.RepositoryInfo;
import com.linecorp.centraldogma.client.armeria.legacy.ThriftTypes.TAuthor;
Expand Down Expand Up @@ -80,7 +81,8 @@ class LegacyCentralDogmaTest {

@BeforeEach
void setUp() {
client = new LegacyCentralDogma(CommonPools.blockingTaskExecutor(), iface);
client = new LegacyCentralDogma(CommonPools.blockingTaskExecutor(), iface,
NoopMeterRegistry.get());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@
import com.linecorp.centraldogma.internal.Util;
import com.linecorp.centraldogma.internal.api.v1.WatchTimeout;

import io.micrometer.core.instrument.MeterRegistry;

final class ArmeriaCentralDogma extends AbstractCentralDogma {

private static final MediaType JSON_PATCH_UTF8 = MediaType.JSON_PATCH.withCharset(StandardCharsets.UTF_8);
Expand Down Expand Up @@ -134,8 +136,9 @@ final class ArmeriaCentralDogma extends AbstractCentralDogma {
private final WebClient client;
private final String authorization;

ArmeriaCentralDogma(ScheduledExecutorService blockingTaskExecutor, WebClient client, String accessToken) {
super(blockingTaskExecutor);
ArmeriaCentralDogma(ScheduledExecutorService blockingTaskExecutor, WebClient client, String accessToken,
@Nullable MeterRegistry meterRegistry) {
super(blockingTaskExecutor, meterRegistry);
this.client = requireNonNull(client, "client");
authorization = "Bearer " + requireNonNull(accessToken, "accessToken");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.internal.client.ReplicationLagTolerantCentralDogma;

import io.micrometer.core.instrument.MeterRegistry;

/**
* Builds a {@link CentralDogma} client based on an <a href="https://line.github.io/armeria/">Armeria</a>
* HTTP client.
Expand All @@ -47,10 +49,10 @@ public CentralDogma build() throws UnknownHostException {
// TODO(ikhoon): Apply ExecutorServiceMetrics for the 'blockingTaskExecutor' once
// https://github.com/line/centraldogma/pull/542 is merged.
final ScheduledExecutorService blockingTaskExecutor = blockingTaskExecutor();

final MeterRegistry meterRegistry = metricsEnabled() ? clientFactory().meterRegistry() : null;
final CentralDogma dogma = new ArmeriaCentralDogma(blockingTaskExecutor,
builder.build(WebClient.class),
accessToken());
accessToken(), meterRegistry);
if (maxRetriesOnReplicationLag <= 0) {
return dogma;
} else {
Expand Down
1 change: 1 addition & 0 deletions client/java/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
dependencies {
implementation 'org.javassist:javassist'
implementation 'io.micrometer:micrometer-core'
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;

import javax.annotation.Nullable;

import com.linecorp.centraldogma.common.Author;
import com.linecorp.centraldogma.common.Change;
import com.linecorp.centraldogma.common.Commit;
Expand All @@ -37,12 +39,16 @@
import com.linecorp.centraldogma.internal.client.FileWatcher;
import com.linecorp.centraldogma.internal.client.RepositoryWatcher;

import io.micrometer.core.instrument.MeterRegistry;

/**
* A skeletal {@link CentralDogma} implementation.
*/
public abstract class AbstractCentralDogma implements CentralDogma {

private final ScheduledExecutorService blockingTaskExecutor;
@Nullable
private final MeterRegistry meterRegistry;

/**
* Creates a new instance.
Expand All @@ -52,7 +58,22 @@ public abstract class AbstractCentralDogma implements CentralDogma {
* watched changes.
*/
protected AbstractCentralDogma(ScheduledExecutorService blockingTaskExecutor) {
this(blockingTaskExecutor, null);
}

/**
* Creates a new instance.
*
* @param blockingTaskExecutor the {@link ScheduledExecutorService} which will be used for scheduling the
* tasks related with automatic retries and invoking the callbacks for
* watched changes.
* @param meterRegistry the {@link MeterRegistry} which collects metrics {@link CentralDogma} specific
* metrics. Metrics aren't collected if this value is {@code null}.
*/
protected AbstractCentralDogma(ScheduledExecutorService blockingTaskExecutor,
@Nullable MeterRegistry meterRegistry) {
this.blockingTaskExecutor = requireNonNull(blockingTaskExecutor, "blockingTaskExecutor");
this.meterRegistry = meterRegistry;
}

/**
Expand Down Expand Up @@ -215,4 +236,9 @@ protected final CompletableFuture<Revision> maybeNormalizeRevision(
return CompletableFuture.completedFuture(revision);
}
}

@Override
public final MeterRegistry meterRegistry() {
return meterRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public abstract class AbstractCentralDogmaBuilder<B extends AbstractCentralDogma
private int maxNumRetriesOnReplicationLag = DEFAULT_MAX_NUM_RETRIES_ON_REPLICATION_LAG;
private long retryIntervalOnReplicationLagMillis =
TimeUnit.SECONDS.toMillis(DEFAULT_RETRY_INTERVAL_ON_REPLICATION_LAG_SECONDS);
private boolean metricsEnabled = true;

/**
* Returns {@code this}.
Expand Down Expand Up @@ -415,4 +416,16 @@ public final B retryIntervalOnReplicationLagMillis(long retryIntervalOnReplicati
protected long retryIntervalOnReplicationLagMillis() {
return retryIntervalOnReplicationLagMillis;
}

/**
* Disables {@link CentralDogma} specific metric collection. Metric collection is enabled by default.
*/
public final B disableMetrics() {
metricsEnabled = false;
return self();
}

protected boolean metricsEnabled() {
return metricsEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.concurrent.Executor;
import java.util.function.Function;

import javax.annotation.Nullable;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;

Expand All @@ -42,6 +44,8 @@
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.common.RevisionNotFoundException;

import io.micrometer.core.instrument.MeterRegistry;

/**
* Central Dogma client.
*/
Expand Down Expand Up @@ -591,4 +595,13 @@ <T> Watcher<T> repositoryWatcher(String projectName, String repositoryName, Stri
*/
<T> Watcher<T> repositoryWatcher(String projectName, String repositoryName, String pathPattern,
Function<Revision, ? extends T> function, Executor executor);

/**
* Returns the {@link MeterRegistry} which collects {@link CentralDogma} specific metrics.
* If this value is {@code null}, metric collection is disabled.
*/
@Nullable
default MeterRegistry meterRegistry() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.util.Objects.requireNonNull;

import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand All @@ -39,6 +40,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AtomicDouble;
import com.spotify.futures.CompletableFutures;

import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.client.Latest;
import com.linecorp.centraldogma.client.Watcher;
Expand All @@ -49,10 +54,12 @@
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.common.ShuttingDownException;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;

abstract class AbstractWatcher<T> implements Watcher<T> {

private static final Logger logger = LoggerFactory.getLogger(AbstractWatcher.class);
private static final CompletableFuture<Void> COMPLETED_FUTURE = CompletableFuture.completedFuture(null);

private static final long DELAY_ON_SUCCESS_MILLIS = TimeUnit.SECONDS.toMillis(1);
private static final long MIN_INTERVAL_MILLIS = DELAY_ON_SUCCESS_MILLIS * 2;
Expand Down Expand Up @@ -114,6 +121,8 @@ private enum State {
private final List<Map.Entry<BiConsumer<? super Revision, ? super T>, Executor>> updateListeners;
private final AtomicReference<State> state;
private final CompletableFuture<Latest<T>> initialValueFuture;
private final AtomicDouble latestNotifiedRevision = new AtomicDouble();
private final AtomicDouble latestRevision = new AtomicDouble();

private volatile Latest<T> latest;
private volatile ScheduledFuture<?> currentScheduleFuture;
Expand All @@ -127,6 +136,17 @@ protected AbstractWatcher(CentralDogma client, ScheduledExecutorService watchSch
this.repositoryName = requireNonNull(repositoryName, "repositoryName");
this.pathPattern = requireNonNull(pathPattern, "pathPattern");

final MeterRegistry meterRegistry = client.meterRegistry();
if (meterRegistry != null) {
final Iterable<Tag> tags = ImmutableList.of(Tag.of("project", projectName),
Tag.of("repository", repositoryName),
Tag.of("path", pathPattern));
meterRegistry.more().counter("centraldogma.client.watcher.notified.revision", tags, this,
ignored -> latestNotifiedRevision.get());
meterRegistry.more().counter("centraldogma.client.watcher.revision", tags, this,
ignored -> latestRevision.get());
}

updateListeners = new CopyOnWriteArrayList<>();
state = new AtomicReference<>(State.INIT);
initialValueFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -240,6 +260,7 @@ private void doWatch(int numAttemptsSoFar) {
latest = newLatest;
logger.debug("watcher noticed updated file {}/{}{}: rev={}",
projectName, repositoryName, pathPattern, newLatest.revision());
latestRevision.set(latest.revision().major());
notifyListeners();
if (oldLatest == null) {
initialValueFuture.complete(newLatest);
Expand Down Expand Up @@ -296,18 +317,29 @@ private void notifyListeners() {
}

final Latest<T> latest = this.latest;
final List<CompletableFuture<Boolean>> futures = new ArrayList<>();
for (Map.Entry<BiConsumer<? super Revision, ? super T>, Executor> entry : updateListeners) {
final BiConsumer<? super Revision, ? super T> listener = entry.getKey();
final Executor executor = entry.getValue();
executor.execute(() -> {
final CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
try {
listener.accept(latest.revision(), latest.value());
return true;
} catch (Exception e) {
logger.warn("Exception thrown for watcher ({}/{}{}): rev={}",
projectName, repositoryName, pathPattern, latest.revision(), e);
return false;
}
});
}, executor);
futures.add(future);
}

CompletableFutures.allAsList(futures).thenAccept(results -> {
final boolean result = results.stream().allMatch(x -> x);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we update the latestNotifiedRevision regardless of a listener throws an exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added another metric which updates the revision irregardless of whether the notify succeeds.

I think the two metrics combined can give users a good idea of whether properties have been propagated correctly.

if (result) {
latestNotifiedRevision.set(latest.revision().major());
}
});
}

private void handleExecutorShutdown(Executor executor, RejectedExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected boolean removeEldestEntry(Map.Entry<RepoId, Revision> eldest) {
public ReplicationLagTolerantCentralDogma(ScheduledExecutorService blockingTaskExecutor,
CentralDogma delegate, int maxRetries, long retryIntervalMillis,
Supplier<?> currentReplicaHintSupplier) {
super(blockingTaskExecutor);
super(blockingTaskExecutor, delegate.meterRegistry());

requireNonNull(delegate, "delegate");
checkArgument(maxRetries > 0, "maxRetries: %s (expected: > 0)", maxRetries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class ReplicationLagTolerantCentralDogmaTest {
void setUp() {
dogma = new ReplicationLagTolerantCentralDogma(executor, delegate, 3, 0,
currentReplicaHintSupplier);
verify(delegate, times(1)).meterRegistry();
}

@AfterAll
Expand Down
Loading