diff --git a/client/java-armeria-legacy/src/main/java/com/linecorp/centraldogma/client/armeria/legacy/LegacyCentralDogma.java b/client/java-armeria-legacy/src/main/java/com/linecorp/centraldogma/client/armeria/legacy/LegacyCentralDogma.java index be95e939bd..f34bbd072d 100644 --- a/client/java-armeria-legacy/src/main/java/com/linecorp/centraldogma/client/armeria/legacy/LegacyCentralDogma.java +++ b/client/java-armeria-legacy/src/main/java/com/linecorp/centraldogma/client/armeria/legacy/LegacyCentralDogma.java @@ -633,6 +633,11 @@ private static Throwable convertCause(Throwable cause) { return convertedCause; } + @Override + public void close() { + endpointGroup.close(); + } + @FunctionalInterface private interface ThriftCall { void apply(ThriftFuture callback) throws TException; diff --git a/client/java-armeria-xds/build.gradle b/client/java-armeria-xds/build.gradle new file mode 100644 index 0000000000..23337ec0ea --- /dev/null +++ b/client/java-armeria-xds/build.gradle @@ -0,0 +1,11 @@ +dependencies { + api project(':client:java-armeria') + + // Armeria + api libs.armeria.xds + + testImplementation libs.controlplane.server + testImplementation libs.controlplane.cache + testImplementation libs.armeria.junit5 + testImplementation libs.jackson.dataformat.yaml +} diff --git a/client/java-armeria-xds/src/main/java/com/linecorp/centraldogma/client/armeria/xds/XdsCentralDogmaBuilder.java b/client/java-armeria-xds/src/main/java/com/linecorp/centraldogma/client/armeria/xds/XdsCentralDogmaBuilder.java new file mode 100644 index 0000000000..93b5c9d97d --- /dev/null +++ b/client/java-armeria-xds/src/main/java/com/linecorp/centraldogma/client/armeria/xds/XdsCentralDogmaBuilder.java @@ -0,0 +1,324 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.centraldogma.client.armeria.xds; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import com.google.protobuf.Any; + +import com.linecorp.armeria.client.ClientBuilder; +import com.linecorp.armeria.client.ClientFactory; +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.Clients; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.client.encoding.DecodingClient; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.CommonPools; +import com.linecorp.armeria.common.HttpHeaderNames; +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.xds.XdsBootstrap; +import com.linecorp.armeria.xds.client.endpoint.XdsEndpointGroup; +import com.linecorp.centraldogma.client.AbstractCentralDogmaBuilder; +import com.linecorp.centraldogma.client.CentralDogma; +import com.linecorp.centraldogma.client.armeria.ArmeriaClientConfigurator; +import com.linecorp.centraldogma.internal.client.ReplicationLagTolerantCentralDogma; +import com.linecorp.centraldogma.internal.client.armeria.ArmeriaCentralDogma; + +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap.DynamicResources; +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap.StaticResources; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; +import io.envoyproxy.envoy.config.core.v3.Address; +import io.envoyproxy.envoy.config.core.v3.ApiConfigSource; +import io.envoyproxy.envoy.config.core.v3.ApiConfigSource.ApiType; +import io.envoyproxy.envoy.config.core.v3.GrpcService; +import io.envoyproxy.envoy.config.core.v3.GrpcService.EnvoyGrpc; +import io.envoyproxy.envoy.config.core.v3.HeaderValue; +import io.envoyproxy.envoy.config.core.v3.Locality; +import io.envoyproxy.envoy.config.core.v3.Node; +import io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.envoyproxy.envoy.config.core.v3.TransportSocket; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.Endpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; + +/** + * Builds a {@link CentralDogma} client based on an Armeria + * HTTP client. + * This client differs from {@link ArmeriaCentralDogma} in that making requests is done in two phases. + *
    + *
  1. + * An xDS request is made to xDS servers to fetch a {@link Listener} resource which represents + * how to connect to {@link CentralDogma} servers. + *
  2. + *
  3. + * Actual {@link CentralDogma} client requests are made based on the watched Listener resource. + *
  4. + *
+ * Because connection to the actual {@link CentralDogma} is determined by the watched {@link Listener}, + * {@link #hosts()} and {@link #isUseTls()} apply to connecting to the bootstrap only. + * However, because Armeria's xDS implementation isn't complete, the following parameters are applied + * to both xDS and Central Dogma requests. + * + * + *

Note that this module is considered experimental and subject to behavioral change. + */ +@UnstableApi +public final class XdsCentralDogmaBuilder extends AbstractCentralDogmaBuilder { + + private static final String BOOTSTRAP_CLUSTER_NAME = "centraldogma-bootstrap-cluster"; + @VisibleForTesting + static final String DEFAULT_LISTENER_NAME = "centraldogma-listener"; + + private ScheduledExecutorService blockingTaskExecutor = CommonPools.blockingTaskExecutor(); + private ClientFactory clientFactory = ClientFactory.ofDefault(); + private ArmeriaClientConfigurator clientConfigurator = cb -> {}; + private String listenerName = DEFAULT_LISTENER_NAME; + private Locality locality = Locality.getDefaultInstance(); + // an empty string means no local cluster + private String serviceCluster = ""; + + // TODO: @jrhee17 remove this once xDS TLS is fully supported + private Function xdsBootstrapFactory = XdsBootstrap::of; + + /** + * Sets the name of the {@link Listener} that should be requested to the xDS bootstrap servers. + * The default is {@value #DEFAULT_LISTENER_NAME}. + */ + public XdsCentralDogmaBuilder listenerName(String listenerName) { + requireNonNull(listenerName, "listenerName"); + this.listenerName = listenerName; + return this; + } + + /** + * Sets the locality of where the {@link CentralDogma} client will be running. This may be used in applying + * zone aware routing + * and is analogous to + * service-zone. + * This value will be set to {@link Node#getLocality()} in the {@link Bootstrap}. + */ + @UnstableApi + public XdsCentralDogmaBuilder serviceZone(String serviceZone) { + requireNonNull(serviceZone, "serviceZone"); + locality = Locality.newBuilder().setZone(serviceZone).build(); + return this; + } + + /** + * Sets the name of the local service cluster which this client will be located in. + * This may be used in applying + * zone aware routing + * and is analogous to + * service-cluster. + * This value will be set to {@link Node#getCluster()} in the {@link Bootstrap}. + */ + @UnstableApi + public XdsCentralDogmaBuilder serviceCluster(String serviceCluster) { + requireNonNull(serviceCluster, "serviceCluster"); + this.serviceCluster = serviceCluster; + return this; + } + + /** + * Sets the {@link ScheduledExecutorService} dedicated to the execution of blocking tasks or invocations. + * If not set, {@linkplain CommonPools#blockingTaskExecutor() the common pool} is used. + * The {@link ScheduledExecutorService} which will be used for scheduling the tasks related with + * automatic retries and invoking the callbacks for watched changes. + */ + public XdsCentralDogmaBuilder blockingTaskExecutor(ScheduledExecutorService blockingTaskExecutor) { + requireNonNull(blockingTaskExecutor, "blockingTaskExecutor"); + this.blockingTaskExecutor = blockingTaskExecutor; + return this; + } + + /** + * Sets the {@link ArmeriaClientConfigurator} that will configure an underlying + * Armeria client which performs the actual socket I/O. + * + *

Note that this doesn't affect the client making requests to the bootstrap servers. + */ + public XdsCentralDogmaBuilder clientConfigurator(ArmeriaClientConfigurator clientConfigurator) { + this.clientConfigurator = requireNonNull(clientConfigurator, "clientConfigurator"); + return this; + } + + /** + * Sets the {@link ClientFactory} that will create an underlying + * Armeria client which performs the actual socket I/O. + * + *

Note that this doesn't affect the client making requests to the bootstrap servers. + */ + public XdsCentralDogmaBuilder clientFactory(ClientFactory clientFactory) { + this.clientFactory = requireNonNull(clientFactory, "clientFactory"); + return this; + } + + @VisibleForTesting + XdsCentralDogmaBuilder xdsBoostrapFactory(Function xdsBootstrapFactory) { + this.xdsBootstrapFactory = requireNonNull(xdsBootstrapFactory, "xdsBootstrapFactory"); + return this; + } + + /** + * Returns a newly-created {@link CentralDogma} instance. + */ + public CentralDogma build() { + final XdsBootstrap xdsBootstrap = xdsBootstrap(); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerName, xdsBootstrap); + final String scheme = "none+" + (isUseTls() ? "https" : "http"); + final ClientBuilder builder = + newClientBuilder(scheme, endpointGroup, cb -> cb.decorator(DecodingClient.newDecorator()), "/"); + final int maxRetriesOnReplicationLag = maxNumRetriesOnReplicationLag(); + + // TODO(ikhoon): Apply ExecutorServiceMetrics for the 'blockingTaskExecutor' once + // https://github.com/line/centraldogma/pull/542 is merged. + final ScheduledExecutorService blockingTaskExecutor = this.blockingTaskExecutor; + + final CentralDogma dogma = new ArmeriaCentralDogma(blockingTaskExecutor, + builder.build(WebClient.class), + accessToken(), + () -> { + endpointGroup.close(); + xdsBootstrap.close(); + }); + if (maxRetriesOnReplicationLag <= 0) { + return dogma; + } else { + return new ReplicationLagTolerantCentralDogma( + blockingTaskExecutor, dogma, maxRetriesOnReplicationLag, + retryIntervalOnReplicationLagMillis(), + () -> { + // FIXME(trustin): Note that this will always return `null` due to a known limitation + // in Armeria: https://github.com/line/armeria/issues/760 + final ClientRequestContext ctx = ClientRequestContext.currentOrNull(); + return ctx != null ? ctx.remoteAddress() : null; + }); + } + } + + private ClientBuilder newClientBuilder(String scheme, EndpointGroup endpointGroup, + Consumer customizer, String path) { + final ClientBuilder builder = Clients.builder(scheme, endpointGroup, path); + customizer.accept(builder); + clientConfigurator.configure(builder); + builder.factory(clientFactory); + return builder; + } + + private boolean isUnresolved() { + final Set hosts = hosts(); + checkState(!hosts.isEmpty(), "No hosts were added."); + final Map> addrByUnresolved = + hosts.stream().collect(Collectors.partitioningBy(InetSocketAddress::isUnresolved)); + // Until multiple clusters are supported, restrict users to either use STATIC or DNS (but not both) + checkState(addrByUnresolved.get(true).isEmpty() || + addrByUnresolved.get(false).isEmpty(), + "Cannot mix resolved and unresolved hosts (%s)", addrByUnresolved); + final InetSocketAddress firstHost = Iterables.get(hosts(), 0); + return firstHost.isUnresolved(); + } + + private XdsBootstrap xdsBootstrap() { + final GrpcService grpcService = GrpcService + .newBuilder() + .setEnvoyGrpc(EnvoyGrpc.newBuilder() + .setClusterName(BOOTSTRAP_CLUSTER_NAME)) + .addInitialMetadata(HeaderValue.newBuilder() + .setKey(HttpHeaderNames.AUTHORIZATION.toString()) + .setValue("Bearer " + accessToken())) + .build(); + final ApiConfigSource apiConfigSource = ApiConfigSource + .newBuilder() + .addGrpcServices(grpcService) + .setApiType(ApiType.AGGREGATED_GRPC) + .build(); + final DynamicResources dynamicResources = + DynamicResources.newBuilder().setAdsConfig(apiConfigSource).build(); + final Bootstrap bootstrap = + Bootstrap.newBuilder() + .setDynamicResources(dynamicResources) + .setNode(Node.newBuilder() + .setCluster(serviceCluster) + .setLocality(locality)) + .setStaticResources(StaticResources.newBuilder().addClusters(bootstrapCluster())) + .build(); + return xdsBootstrapFactory.apply(bootstrap); + } + + private Cluster bootstrapCluster() { + final boolean isUnresolved = isUnresolved(); + + final Cluster.Builder clusterBuilder = Cluster.newBuilder(); + if (isUnresolved) { + clusterBuilder.setType(DiscoveryType.STRICT_DNS); + } else { + clusterBuilder.setType(DiscoveryType.STATIC); + } + + final LocalityLbEndpoints.Builder localityLbEndpointsBuilder = LocalityLbEndpoints.newBuilder(); + for (InetSocketAddress addr : hosts()) { + final LbEndpoint lbEndpoint = fromAddress(addr); + localityLbEndpointsBuilder.addLbEndpoints(lbEndpoint); + } + final ClusterLoadAssignment clusterLoadAssignment = + ClusterLoadAssignment.newBuilder().addEndpoints(localityLbEndpointsBuilder.build()).build(); + + if (isUseTls()) { + clusterBuilder.setTransportSocket( + TransportSocket.newBuilder() + .setName("envoy.transport_sockets.tls") + .setTypedConfig(Any.pack(UpstreamTlsContext.getDefaultInstance()))); + } + + clusterBuilder.setLoadAssignment(clusterLoadAssignment) + .setName(BOOTSTRAP_CLUSTER_NAME); + return clusterBuilder.build(); + } + + private static LbEndpoint fromAddress(InetSocketAddress addr) { + final String hostString = addr.getHostString(); + final int port = addr.getPort(); + final Address address = Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress(hostString) + .setPortValue(port)).build(); + return LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(address)) + .build(); + } +} diff --git a/client/java-armeria-xds/src/main/java/com/linecorp/centraldogma/client/armeria/xds/package-info.java b/client/java-armeria-xds/src/main/java/com/linecorp/centraldogma/client/armeria/xds/package-info.java new file mode 100644 index 0000000000..ecb7f87ce6 --- /dev/null +++ b/client/java-armeria-xds/src/main/java/com/linecorp/centraldogma/client/armeria/xds/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/** + * Armeria's xDS-based Central Dogma client implementation. + * @see Java client library + */ +@NonNullByDefault +package com.linecorp.centraldogma.client.armeria.xds; + +import com.linecorp.centraldogma.common.util.NonNullByDefault; diff --git a/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/AuthUpstreamTest.java b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/AuthUpstreamTest.java new file mode 100644 index 0000000000..95b9757439 --- /dev/null +++ b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/AuthUpstreamTest.java @@ -0,0 +1,133 @@ +/* + * Copyright 2024 LINE Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.client.armeria.xds; + +import static com.linecorp.armeria.common.util.UnmodifiableFuture.completedFuture; +import static com.linecorp.centraldogma.testing.internal.auth.TestAuthMessageUtil.getAccessToken; +import static net.javacrumbs.jsonunit.fluent.JsonFluentAssert.assertThatJson; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import com.linecorp.armeria.common.auth.OAuth2Token; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.auth.AuthService; +import com.linecorp.armeria.server.auth.Authorizer; +import com.linecorp.armeria.server.grpc.GrpcService; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import com.linecorp.centraldogma.client.CentralDogma; +import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.Entry; +import com.linecorp.centraldogma.common.Query; +import com.linecorp.centraldogma.server.CentralDogmaBuilder; +import com.linecorp.centraldogma.testing.internal.auth.TestAuthMessageUtil; +import com.linecorp.centraldogma.testing.internal.auth.TestAuthProviderFactory; +import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; + +import io.envoyproxy.controlplane.cache.v3.SimpleCache; +import io.envoyproxy.controlplane.cache.v3.Snapshot; +import io.envoyproxy.controlplane.server.V3DiscoveryServer; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.listener.v3.Listener; + +class AuthUpstreamTest { + + private static final AtomicLong VERSION_NUMBER = new AtomicLong(); + private static final String GROUP = "key"; + private static final SimpleCache cache = new SimpleCache<>(node -> GROUP); + + @RegisterExtension + static CentralDogmaExtension dogma = new CentralDogmaExtension() { + + @Override + protected void configure(CentralDogmaBuilder builder) { + builder.administrators(TestAuthMessageUtil.USERNAME); + builder.authProviderFactory(new TestAuthProviderFactory()); + } + + @Override + protected void scaffold(CentralDogma client) { + client.createProject("foo").join(); + client.createRepository("foo", "bar") + .join() + .commit("Initial file", Change.ofJsonUpsert("/foo.json", "{ \"a\": \"bar\" }")) + .push() + .join(); + } + }; + + private static final AtomicReference accessTokenRef = new AtomicReference<>(); + + @RegisterExtension + static final ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + final Authorizer tokenAuthorizer = + (ctx, token) -> completedFuture(accessTokenRef.get().equals(token.accessToken())); + sb.decorator(AuthService.builder().addOAuth2(tokenAuthorizer).newDecorator()); + final V3DiscoveryServer v3DiscoveryServer = new V3DiscoveryServer(cache); + sb.service(GrpcService.builder() + .addService(v3DiscoveryServer.getAggregatedDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getListenerDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getClusterDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getRouteDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getEndpointDiscoveryServiceImpl()) + .build()); + } + }; + + @Test + void basicAuthCase() throws Exception { + final String accessToken = getAccessToken(dogma.httpClient(), TestAuthMessageUtil.USERNAME, + TestAuthMessageUtil.PASSWORD); + // so that the xds server can also verify the access token is correctly set + accessTokenRef.set(accessToken); + + final Listener listener = XdsResourceReader.readResourcePath( + "/test-listener.yaml", + Listener.newBuilder(), + ImmutableMap.of("", XdsCentralDogmaBuilder.DEFAULT_LISTENER_NAME, + "", "my-cluster")); + final Cluster cluster = XdsResourceReader.readResourcePath( + "/test-cluster.yaml", + Cluster.newBuilder(), + ImmutableMap.of("", "my-cluster", "", "STATIC", + "", dogma.serverAddress().getPort())); + cache.setSnapshot( + GROUP, + Snapshot.create(ImmutableList.of(cluster), ImmutableList.of(), + ImmutableList.of(listener), ImmutableList.of(), ImmutableList.of(), + String.valueOf(VERSION_NUMBER.incrementAndGet()))); + + try (CentralDogma client = new XdsCentralDogmaBuilder() + .accessToken(accessToken) + .host("127.0.0.1", server.httpPort()).build()) { + final Entry entry = client.forRepo("foo", "bar") + .file(Query.ofJsonPath("/foo.json")) + .get() + .get(); + assertThatJson(entry.content()).node("a").isStringEqualTo("bar"); + } + } +} diff --git a/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/SimpleXdsUpstreamTest.java b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/SimpleXdsUpstreamTest.java new file mode 100644 index 0000000000..44c0c3f52b --- /dev/null +++ b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/SimpleXdsUpstreamTest.java @@ -0,0 +1,204 @@ +/* + * Copyright 2024 LINE Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.client.armeria.xds; + +import static net.javacrumbs.jsonunit.fluent.JsonFluentAssert.assertThatJson; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.ClientRequestContextCaptor; +import com.linecorp.armeria.client.Clients; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.grpc.GrpcService; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import com.linecorp.centraldogma.client.CentralDogma; +import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.Entry; +import com.linecorp.centraldogma.common.Query; +import com.linecorp.centraldogma.server.CentralDogmaBuilder; +import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; + +import io.envoyproxy.controlplane.cache.v3.SimpleCache; +import io.envoyproxy.controlplane.cache.v3.Snapshot; +import io.envoyproxy.controlplane.server.V3DiscoveryServer; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.listener.v3.Listener; + +class SimpleXdsUpstreamTest { + + private static final AtomicLong VERSION_NUMBER = new AtomicLong(); + private static final String GROUP = "key"; + private static final SimpleCache cache = new SimpleCache<>(node -> GROUP); + + @RegisterExtension + static final ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + sb.port(0, SessionProtocol.HTTP); + sb.port(0, SessionProtocol.HTTP); + sb.port(0, SessionProtocol.HTTP); + final V3DiscoveryServer v3DiscoveryServer = new V3DiscoveryServer(cache); + sb.service(GrpcService.builder() + .addService(v3DiscoveryServer.getAggregatedDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getListenerDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getClusterDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getRouteDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getEndpointDiscoveryServiceImpl()) + .build()); + } + }; + + @RegisterExtension + static CentralDogmaExtension dogma = new CentralDogmaExtension() { + + @Override + protected void configure(CentralDogmaBuilder builder) { + builder.port(0, SessionProtocol.HTTP); + builder.port(0, SessionProtocol.HTTP); + } + + @Override + protected void scaffold(CentralDogma client) { + client.createProject("foo").join(); + client.createRepository("foo", "bar") + .join() + .commit("Initial file", Change.ofJsonUpsert("/foo.json", "{ \"a\": \"bar\" }")) + .push() + .join(); + } + }; + + @Test + void singleBootstrapSingleUpstream() throws Exception { + final Listener listener = XdsResourceReader.readResourcePath( + "/test-listener.yaml", + Listener.newBuilder(), + ImmutableMap.of("", XdsCentralDogmaBuilder.DEFAULT_LISTENER_NAME, + "", "my-cluster")); + final Cluster cluster = XdsResourceReader.readResourcePath( + "/test-cluster.yaml", + Cluster.newBuilder(), + ImmutableMap.of("", "my-cluster", "", "STATIC", + "", dogma.serverAddress().getPort())); + cache.setSnapshot( + GROUP, + Snapshot.create(ImmutableList.of(cluster), ImmutableList.of(), + ImmutableList.of(listener), ImmutableList.of(), ImmutableList.of(), + String.valueOf(VERSION_NUMBER.incrementAndGet()))); + + try (CentralDogma client = new XdsCentralDogmaBuilder().host("127.0.0.1", server.httpPort()).build()) { + final Entry entry = client.forRepo("foo", "bar") + .file(Query.ofJsonPath("/foo.json")) + .get() + .get(); + assertThatJson(entry.content()).node("a").isStringEqualTo("bar"); + } + } + + @Test + void multiBootstrapMultiUpstream() throws Exception { + final List dogmaPorts = dogma.dogma().activePorts().values().stream().map( + port -> port.localAddress().getPort()).collect(Collectors.toList()); + final List serverPorts = server.server().activePorts().values().stream().map( + port -> port.localAddress().getPort()).collect(Collectors.toList()); + assertThat(dogmaPorts).hasSize(3); + + final Listener listener = XdsResourceReader.readResourcePath( + "/test-listener.yaml", + Listener.newBuilder(), + ImmutableMap.of("", XdsCentralDogmaBuilder.DEFAULT_LISTENER_NAME, + "", "my-cluster")); + final Cluster cluster = XdsResourceReader.readResourcePath( + "/test-cluster-multiendpoint.yaml", + Cluster.newBuilder(), + ImmutableMap.of("", "my-cluster", "", "STATIC", + "", dogmaPorts.get(0), + "", dogmaPorts.get(1), + "", dogmaPorts.get(2))); + cache.setSnapshot( + GROUP, + Snapshot.create(ImmutableList.of(cluster), ImmutableList.of(), + ImmutableList.of(listener), ImmutableList.of(), ImmutableList.of(), + String.valueOf(VERSION_NUMBER.incrementAndGet()))); + + final XdsCentralDogmaBuilder builder = new XdsCentralDogmaBuilder(); + for (Integer port : serverPorts) { + builder.host("127.0.0.1", port); + } + final Set selectedPorts = new HashSet<>(); + try (CentralDogma client = builder.build()) { + await().untilAsserted(() -> assertThat(client.whenEndpointReady()).isDone()); + // RoundRobinStrategy guarantees that each port will be selected once + for (int i = 0; i < dogmaPorts.size(); i++) { + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + final Entry entry = client.forRepo("foo", "bar") + .file(Query.ofJsonPath("/foo.json")) + .get() + .get(); + assertThatJson(entry.content()).node("a").isStringEqualTo("bar"); + final ClientRequestContext ctx = captor.get(); + selectedPorts.add(ctx.endpoint().port()); + } + } + } + assertThat(selectedPorts).containsExactlyInAnyOrderElementsOf(dogmaPorts); + } + + @Test + void customListenerName() throws Exception { + final Listener listener = XdsResourceReader.readResourcePath( + "/test-listener.yaml", + Listener.newBuilder(), + ImmutableMap.of("", "my-listener", + "", "my-cluster")); + final Cluster cluster = XdsResourceReader.readResourcePath( + "/test-cluster.yaml", + Cluster.newBuilder(), + ImmutableMap.of("", "my-cluster", "", "STATIC", + "", dogma.serverAddress().getPort())); + cache.setSnapshot( + GROUP, + Snapshot.create(ImmutableList.of(cluster), ImmutableList.of(), + ImmutableList.of(listener), ImmutableList.of(), ImmutableList.of(), + String.valueOf(VERSION_NUMBER.incrementAndGet()))); + + try (CentralDogma client = new XdsCentralDogmaBuilder() + .listenerName("my-listener") + .host("127.0.0.1", server.httpPort()).build()) { + final Entry entry = client.forRepo("foo", "bar") + .file(Query.ofJsonPath("/foo.json")) + .get() + .get(); + assertThatJson(entry.content()).node("a").isStringEqualTo("bar"); + } + } +} diff --git a/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/TlsUpstreamTest.java b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/TlsUpstreamTest.java new file mode 100644 index 0000000000..e91de9e2ee --- /dev/null +++ b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/TlsUpstreamTest.java @@ -0,0 +1,143 @@ +/* + * Copyright 2024 LINE Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.client.armeria.xds; + +import static net.javacrumbs.jsonunit.fluent.JsonFluentAssert.assertThatJson; + +import java.lang.reflect.Constructor; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import com.linecorp.armeria.client.ClientFactory; +import com.linecorp.armeria.client.grpc.GrpcClientBuilder; +import com.linecorp.armeria.common.CommonPools; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.grpc.GrpcService; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import com.linecorp.armeria.xds.XdsBootstrap; +import com.linecorp.centraldogma.client.CentralDogma; +import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.Entry; +import com.linecorp.centraldogma.common.Query; +import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; + +import io.envoyproxy.controlplane.cache.v3.SimpleCache; +import io.envoyproxy.controlplane.cache.v3.Snapshot; +import io.envoyproxy.controlplane.server.V3DiscoveryServer; +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.netty.util.concurrent.EventExecutor; + +class TlsUpstreamTest { + + private static final AtomicLong VERSION_NUMBER = new AtomicLong(); + private static final String GROUP = "key"; + private static final SimpleCache cache = new SimpleCache<>(node -> GROUP); + + @RegisterExtension + static CentralDogmaExtension dogma = new CentralDogmaExtension(true) { + + @Override + protected void scaffold(CentralDogma client) { + client.createProject("foo").join(); + client.createRepository("foo", "bar") + .join() + .commit("Initial file", Change.ofJsonUpsert("/foo.json", "{ \"a\": \"bar\" }")) + .push() + .join(); + } + }; + + @RegisterExtension + static final ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + sb.port(0, SessionProtocol.HTTPS); + sb.tlsSelfSigned(); + final V3DiscoveryServer v3DiscoveryServer = new V3DiscoveryServer(cache); + sb.service(GrpcService.builder() + .addService(v3DiscoveryServer.getAggregatedDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getListenerDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getClusterDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getRouteDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getEndpointDiscoveryServiceImpl()) + .build()); + } + }; + + @Test + void bootstrapTlsUpstreamTls() throws Exception { + // so that the xds server can also verify the access token is correctly set + final Listener listener = XdsResourceReader.readResourcePath( + "/test-listener.yaml", + Listener.newBuilder(), + ImmutableMap.of("", XdsCentralDogmaBuilder.DEFAULT_LISTENER_NAME, + "", "my-cluster")); + final Cluster cluster = XdsResourceReader.readResourcePath( + "/test-cluster.yaml", + Cluster.newBuilder(), + ImmutableMap.of("", "my-cluster", "", "STATIC", + "", dogma.serverAddress().getPort())); + cache.setSnapshot( + GROUP, + Snapshot.create(ImmutableList.of(cluster), ImmutableList.of(), + ImmutableList.of(listener), ImmutableList.of(), ImmutableList.of(), + String.valueOf(VERSION_NUMBER.incrementAndGet()))); + + try (CentralDogma client = new XdsCentralDogmaBuilder() + .useTls(true) + .clientFactory(ClientFactory.insecure()) + .xdsBoostrapFactory(TlsUpstreamTest::insecureXdsBootstrap) + .host("127.0.0.1", server.httpsPort()).build()) { + final Entry entry = client.forRepo("foo", "bar") + .file(Query.ofJsonPath("/foo.json")) + .get() + .get(); + assertThatJson(entry.content()).node("a").isStringEqualTo("bar"); + } + } + + /** + * A dirty workaround to set {@link ClientFactory#insecure()} when making requests to the xDS server. + */ + private static XdsBootstrap insecureXdsBootstrap(Bootstrap bootstrap) { + try { + final Class bootstrapImplClazz = + TlsUpstreamTest.class.getClassLoader() + .loadClass("com.linecorp.armeria.xds.XdsBootstrapImpl"); + final Constructor ctor = + bootstrapImplClazz + .getDeclaredConstructor(Bootstrap.class, EventExecutor.class, Consumer.class); + ctor.setAccessible(true); + return (XdsBootstrap) ctor.newInstance(bootstrap, CommonPools.workerGroup().next(), + (Consumer) grpcClientBuilder -> { + grpcClientBuilder.factory(ClientFactory.insecure()); + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/XdsResourceReader.java b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/XdsResourceReader.java new file mode 100644 index 0000000000..0715fc2399 --- /dev/null +++ b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/XdsResourceReader.java @@ -0,0 +1,71 @@ +/* + * Copyright 2024 LINE Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.client.armeria.xds; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Map.Entry; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Resources; +import com.google.protobuf.GeneratedMessageV3; +import com.google.protobuf.util.JsonFormat; +import com.google.protobuf.util.JsonFormat.Parser; +import com.google.protobuf.util.JsonFormat.TypeRegistry; + +import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; + +public final class XdsResourceReader { + + private static final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + private static final Parser parser = + JsonFormat.parser().usingTypeRegistry(TypeRegistry.newBuilder() + .add(HttpConnectionManager.getDescriptor()) + .add(Router.getDescriptor()) + .build()); + + private XdsResourceReader() {} + + public static T readResourcePath(String resourceName, GeneratedMessageV3.Builder builder) { + return readResourcePath(resourceName, builder, ImmutableMap.of()); + } + + @SuppressWarnings("unchecked") + public static T readResourcePath(String resourceName, GeneratedMessageV3.Builder builder, + Map variablesMap) { + final URL resource = XdsResourceReader.class.getResource(resourceName); + checkNotNull(resource, "Couldn't find resource (%s)", resourceName); + try { + String resourceStr = Resources.toString(resource, StandardCharsets.UTF_8); + for (Entry entry : variablesMap.entrySet()) { + resourceStr = resourceStr.replaceAll(entry.getKey(), entry.getValue().toString()); + } + final JsonNode jsonNode = mapper.reader().readTree(resourceStr); + parser.merge(jsonNode.toString(), builder); + } catch (Exception e) { + throw new RuntimeException(e); + } + return (T) builder.build(); + } +} diff --git a/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/XdsResourceReaderTest.java b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/XdsResourceReaderTest.java new file mode 100644 index 0000000000..e8b2ffe87e --- /dev/null +++ b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/XdsResourceReaderTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2024 LINE Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.client.armeria.xds; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +import com.google.common.collect.ImmutableMap; + +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; + +class XdsResourceReaderTest { + + @Test + void basicCase() throws Exception { + final Listener listener = XdsResourceReader.readResourcePath( + "/test-listener.yaml", + Listener.newBuilder(), + ImmutableMap.of("", "listener_0", "", "my-cluster")); + assertThat(listener.getName()).isEqualTo("listener_0"); + final HttpConnectionManager manager = listener.getApiListener().getApiListener() + .unpack(HttpConnectionManager.class); + assertThat(manager.getRouteConfig().getName()).isEqualTo("local_route"); + assertThat(manager.getRouteConfig().getVirtualHosts(0).getRoutes(0) + .getRoute().getCluster()).isEqualTo("my-cluster"); + } + + @Test + void clusterReplacements() throws Exception { + final Cluster cluster = XdsResourceReader.readResourcePath( + "/test-cluster.yaml", + Cluster.newBuilder(), + ImmutableMap.of("", "test-cluster", "", "EDS", "", "8080")); + assertThat(cluster.getName()).isEqualTo("test-cluster"); + assertThat(cluster.getType()).isEqualTo(DiscoveryType.EDS); + final ClusterLoadAssignment loadAssignment = cluster.getLoadAssignment(); + assertThat(loadAssignment.getClusterName()).isEqualTo("test-cluster"); + assertThat(loadAssignment.getEndpoints(0).getLbEndpoints(0).getEndpoint().getAddress() + .getSocketAddress().getPortValue()) + .isEqualTo(8080); + } +} diff --git a/client/java-armeria-xds/src/test/resources/test-cluster-multiendpoint.yaml b/client/java-armeria-xds/src/test/resources/test-cluster-multiendpoint.yaml new file mode 100644 index 0000000000..fd2f359ed0 --- /dev/null +++ b/client/java-armeria-xds/src/test/resources/test-cluster-multiendpoint.yaml @@ -0,0 +1,23 @@ +name: +connect_timeout: 0.25s +type: +lb_policy: ROUND_ROBIN +load_assignment: + cluster_name: + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: diff --git a/client/java-armeria-xds/src/test/resources/test-cluster.yaml b/client/java-armeria-xds/src/test/resources/test-cluster.yaml new file mode 100644 index 0000000000..79e029f31e --- /dev/null +++ b/client/java-armeria-xds/src/test/resources/test-cluster.yaml @@ -0,0 +1,13 @@ +name: +connect_timeout: 0.25s +type: +lb_policy: ROUND_ROBIN +load_assignment: + cluster_name: + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: diff --git a/client/java-armeria-xds/src/test/resources/test-listener.yaml b/client/java-armeria-xds/src/test/resources/test-listener.yaml new file mode 100644 index 0000000000..95491f1dda --- /dev/null +++ b/client/java-armeria-xds/src/test/resources/test-listener.yaml @@ -0,0 +1,22 @@ +name: +address: + socket_address: + address: 0.0.0.0 + port_value: 8080 +api_listener: + api_listener: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: ingress_http + http_protocol_options: + enable_trailers: true + codec_type: AUTO + route_config: + name: local_route + virtual_hosts: + - name: local_service + domains: [ "*" ] + routes: + - match: + prefix: "/" + route: + cluster: diff --git a/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/ArmeriaCentralDogmaBuilder.java b/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/ArmeriaCentralDogmaBuilder.java index 89bb8b172e..d1c2dba167 100644 --- a/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/ArmeriaCentralDogmaBuilder.java +++ b/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/ArmeriaCentralDogmaBuilder.java @@ -25,6 +25,7 @@ import com.linecorp.armeria.client.endpoint.EndpointGroup; import com.linecorp.centraldogma.client.CentralDogma; import com.linecorp.centraldogma.internal.client.ReplicationLagTolerantCentralDogma; +import com.linecorp.centraldogma.internal.client.armeria.ArmeriaCentralDogma; /** * Builds a {@link CentralDogma} client based on an Armeria @@ -50,7 +51,8 @@ public CentralDogma build() throws UnknownHostException { final CentralDogma dogma = new ArmeriaCentralDogma(blockingTaskExecutor, builder.build(WebClient.class), - accessToken()); + accessToken(), + endpointGroup::close); if (maxRetriesOnReplicationLag <= 0) { return dogma; } else { diff --git a/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/DnsAddressEndpointGroupConfigurator.java b/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/DnsAddressEndpointGroupConfigurator.java index 5cbbe7edc5..ad5e28cd00 100644 --- a/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/DnsAddressEndpointGroupConfigurator.java +++ b/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/DnsAddressEndpointGroupConfigurator.java @@ -18,6 +18,7 @@ import com.linecorp.armeria.client.endpoint.dns.DnsAddressEndpointGroupBuilder; import com.linecorp.centraldogma.client.CentralDogma; +import com.linecorp.centraldogma.internal.client.armeria.ArmeriaCentralDogma; /** * Configures the DNS resolution of the Armeria client of diff --git a/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/ArmeriaCentralDogma.java b/client/java-armeria/src/main/java/com/linecorp/centraldogma/internal/client/armeria/ArmeriaCentralDogma.java similarity index 98% rename from client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/ArmeriaCentralDogma.java rename to client/java-armeria/src/main/java/com/linecorp/centraldogma/internal/client/armeria/ArmeriaCentralDogma.java index 31764fb1a6..72ebd1f99c 100644 --- a/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/ArmeriaCentralDogma.java +++ b/client/java-armeria/src/main/java/com/linecorp/centraldogma/internal/client/armeria/ArmeriaCentralDogma.java @@ -1,7 +1,7 @@ /* - * Copyright 2019 LINE Corporation + * Copyright 2024 LINE Corporation * - * LINE Corporation licenses this file to you under the Apache License, + * LY Corporation licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * @@ -14,7 +14,7 @@ * under the License. */ -package com.linecorp.centraldogma.client.armeria; +package com.linecorp.centraldogma.internal.client.armeria; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; @@ -108,7 +108,7 @@ import com.linecorp.centraldogma.internal.Util; import com.linecorp.centraldogma.internal.api.v1.WatchTimeout; -final class ArmeriaCentralDogma extends AbstractCentralDogma { +public final class ArmeriaCentralDogma extends AbstractCentralDogma { private static final MediaType JSON_PATCH_UTF8 = MediaType.JSON_PATCH.withCharset(StandardCharsets.UTF_8); @@ -137,11 +137,14 @@ final class ArmeriaCentralDogma extends AbstractCentralDogma { private final WebClient client; private final String authorization; + private final SafeCloseable safeCloseable; - ArmeriaCentralDogma(ScheduledExecutorService blockingTaskExecutor, WebClient client, String accessToken) { + public ArmeriaCentralDogma(ScheduledExecutorService blockingTaskExecutor, + WebClient client, String accessToken, SafeCloseable safeCloseable) { super(blockingTaskExecutor); this.client = requireNonNull(client, "client"); authorization = "Bearer " + requireNonNull(accessToken, "accessToken"); + this.safeCloseable = safeCloseable; } @Override @@ -1136,4 +1139,9 @@ private static T handleErrorResponse(AggregatedHttpResponse res) { throw new CentralDogmaException("unexpected response: " + res.headers() + ", " + res.contentUtf8()); } + + @Override + public void close() { + safeCloseable.close(); + } } diff --git a/client/java-armeria/src/main/java/com/linecorp/centraldogma/internal/client/armeria/package-info.java b/client/java-armeria/src/main/java/com/linecorp/centraldogma/internal/client/armeria/package-info.java new file mode 100644 index 0000000000..a2f255756a --- /dev/null +++ b/client/java-armeria/src/main/java/com/linecorp/centraldogma/internal/client/armeria/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/** + * Armeria-based Central Dogma client internal implementation. + * + */ +@NonNullByDefault +package com.linecorp.centraldogma.internal.client.armeria; + +import com.linecorp.centraldogma.common.util.NonNullByDefault; diff --git a/client/java/src/main/java/com/linecorp/centraldogma/client/CentralDogma.java b/client/java/src/main/java/com/linecorp/centraldogma/client/CentralDogma.java index 5b94c531f5..ab5c729406 100644 --- a/client/java/src/main/java/com/linecorp/centraldogma/client/CentralDogma.java +++ b/client/java/src/main/java/com/linecorp/centraldogma/client/CentralDogma.java @@ -48,7 +48,7 @@ /** * Central Dogma client. */ -public interface CentralDogma { +public interface CentralDogma extends AutoCloseable { /** * Returns a new {@link CentralDogmaRepository} that is used to send a request to the specified @@ -850,4 +850,11 @@ Watcher repositoryWatcher(String projectName, String repositoryName, Stri * without additional delay. */ CompletableFuture whenEndpointReady(); + + /** + * Closes underlying resources that may be used when making requests to the server such as + * health checking or dns queries. + */ + @Override + void close() throws Exception; } diff --git a/client/java/src/main/java/com/linecorp/centraldogma/internal/client/ReplicationLagTolerantCentralDogma.java b/client/java/src/main/java/com/linecorp/centraldogma/internal/client/ReplicationLagTolerantCentralDogma.java index 1114ff385c..0f91343724 100644 --- a/client/java/src/main/java/com/linecorp/centraldogma/internal/client/ReplicationLagTolerantCentralDogma.java +++ b/client/java/src/main/java/com/linecorp/centraldogma/internal/client/ReplicationLagTolerantCentralDogma.java @@ -797,6 +797,11 @@ private static Object resultOrCause(@Nullable Object res, @Nullable Throwable ca return res != null ? res : cause; } + @Override + public void close() throws Exception { + delegate.close(); + } + private static final class RepoId { private final String projectName; private final String repositoryName; diff --git a/dependencies.toml b/dependencies.toml index 797a81d2e9..6e98092a25 100644 --- a/dependencies.toml +++ b/dependencies.toml @@ -4,29 +4,33 @@ # [versions] armeria = "1.30.0" -assertj = "3.26.0" -awaitility = "4.2.1" +assertj = "3.26.3" +awaitility = "4.2.2" bouncycastle = "1.78.1" +# Don"t upgrade Caffeine to 3.x that requires Java 11. caffeine = "2.9.3" checkstyle = "10.3.3" controlplane = "1.0.45" -curator = "5.6.0" +# Ensure that we use the same ZooKeeper version as what Curator depends on. +# See: https://github.com/apache/curator/blob/master/pom.xml +# (Switch to the right tag to find out the right version.) +curator = "5.7.0" # Do not upgrade cron-utils until there's another CVE or Armeria's SLF4J and Logback are upgraded. cron-utils = "9.2.0" diffutils = "1.3.0" docker = "9.4.0" download = "5.6.0" -dropwizard-metrics = "4.2.21" +dropwizard-metrics = "4.2.26" eddsa = "0.3.0" findbugs = "3.0.2" futures-completable = "0.3.6" -grpc-java = "1.64.0" +grpc-java = "1.66.0" guava = "33.2.1-jre" guava-failureaccess = "1.0.1" hamcrest-library = "2.2" hibernate-validator6 = "6.2.5.Final" hibernate-validator8 = "8.0.1.Final" -jackson = "2.17.1" +jackson = "2.17.2" javassist = "3.30.2-GA" javax-annotation = "1.3.2" javax-inject = "1" @@ -35,20 +39,23 @@ jcommander = "1.82" jetty-alpn-api = "1.1.3.v20160715" jetty-alpn-agent = "2.0.10" jgit = "5.13.3.202401111512-r" -jgit6 = "6.9.0.202403050737-r" +jgit6 = "6.10.0.202406032230-r" junit4 = "4.13.2" -junit5 = "5.10.2" +junit5 = "5.11.0" jsch = "0.1.55" +# Don't update `json-path` version json-path = "2.2.0" +# 3.0.0 requires java 17 json-unit = "2.38.0" jmh-core = "1.37" jmh-gradle-plugin = "0.7.2" jxr = "0.2.1" logback12 = { strictly = "1.2.13" } -logback15 = { strictly = "1.5.5" } +logback15 = { strictly = "1.5.7" } logback = "1.2.13" -micrometer = "1.13.0" -mina-sshd = "2.12.1" +micrometer = "1.13.3" +mina-sshd = "2.13.2" +# Don't uprade mockito to 5.x.x that requires Java 11 mockito = "4.11.0" nexus-publish-plugin = "2.0.0" node-gradle-plugin = "7.0.2" @@ -57,20 +64,25 @@ proguard = "7.4.2" protobuf = "3.25.1" protobuf-gradle-plugin = "0.8.19" quartz = "2.3.2" +reflections = "0.9.11" shadow-gradle-plugin = "7.1.2" +# Don't update `shiro` version shiro = "1.3.2" slf4j1 = { strictly = "1.7.36" } -slf4j2 = { strictly = "2.0.12" } +slf4j2 = { strictly = "2.0.16" } # Ensure that we use the same Snappy version as what Curator depends on. # See: https://github.com/apache/curator/blob/master/pom.xml -snappy = "1.1.10.4" +snappy = "1.1.10.5" sphinx = "2.10.1" spring-boot2 = "2.7.18" -spring-boot3 = "3.3.0" +spring-boot3 = "3.3.2" spring-test-junit5 = "1.5.0" -testcontainers = "1.19.8" +testcontainers = "1.20.1" thrift09 = { strictly = "0.9.3-1" } -zookeeper = "3.7.2" +# Ensure that we use the same ZooKeeper version as what Curator depends on. +# See: https://github.com/apache/curator/blob/master/pom.xml +# (Switch to the right tag to find out the right version.) +zookeeper = "3.9.1" [boms] armeria = { module = "com.linecorp.armeria:armeria-bom", version.ref = "armeria" } @@ -112,7 +124,6 @@ version.ref = "bouncycastle" module = "org.bouncycastle:bcprov-jdk18on" version.ref = "bouncycastle" -# Don"t upgrade Caffeine to 3.x that requires Java 11. [libraries.caffeine] module = "com.github.ben-manes.caffeine:caffeine" version.ref = "caffeine" @@ -127,9 +138,6 @@ module = "com.cronutils:cron-utils" version.ref = "cron-utils" relocations = { from = "com.cronutils", to = "com.linecorp.centraldogma.internal.shaded.cronutils" } -# Ensure that we use the same ZooKeeper version as what Curator depends on. -# See: https://github.com/apache/curator/blob/master/pom.xml -# (Switch to the right tag to find out the right version.) [libraries.curator-recipes] module = "org.apache.curator:curator-recipes" version.ref = "curator" @@ -185,7 +193,6 @@ relocations = { from = "com.google.common", to = "com.linecorp.centraldogma.inte module = "org.hamcrest:hamcrest-library" version.ref = "hamcrest-library" -# Need to wait for spring boot 3 before upgrading to 7.x.x [libraries.hibernate-validator6] module = "org.hibernate.validator:hibernate-validator" version.ref = "hibernate-validator6" @@ -202,6 +209,8 @@ javadocs = "https://fasterxml.github.io/jackson-core/javadoc/2.13/" [libraries.jackson-databind] module = "com.fasterxml.jackson.core:jackson-databind" javadocs = "https://fasterxml.github.io/jackson-databind/javadoc/2.13/" +[libraries.jackson-dataformat-yaml] +module = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml" [libraries.jackson-datatype-jsr310] module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310" # Only used for testing. See JacksonRequestConverterFunctionTest for more information. @@ -255,13 +264,11 @@ version.ref = "jmh-core" module = "com.jcraft:jsch" version.ref = "jsch" -# Don't update `json-path` version [libraries.json-path] module = "com.jayway.jsonpath:json-path" version.ref = "json-path" relocations = { from = "com.jayway.jsonpath", to = "com.linecorp.centraldogma.internal.shaded.jsonpath" } -# 3.0.0 requires java 17 [libraries.json-unit] module = "net.javacrumbs.json-unit:json-unit" version.ref = "json-unit" @@ -292,12 +299,15 @@ module = "org.junit.platform:junit-platform-launcher" [libraries.logback12] module = "ch.qos.logback:logback-classic" version.ref = "logback12" -javadocs = "https://www.javadoc.io/doc/ch.qos.logback/logback-classic/1.2.12/" +javadocs = "https://www.javadoc.io/doc/ch.qos.logback/logback-classic/1.2.13/" [libraries.logback15] module = "ch.qos.logback:logback-classic" version.ref = "logback15" -javadocs = "https://www.javadoc.io/doc/ch.qos.logback/logback-classic/1.5.4/" +javadocs = "https://www.javadoc.io/doc/ch.qos.logback/logback-classic/1.5.7/" +[libraries.controlplane-api] +module = "io.envoyproxy.controlplane:api" +version.ref = "controlplane" [libraries.controlplane-cache] module = "io.envoyproxy.controlplane:cache" version.ref = "controlplane" @@ -343,11 +353,18 @@ version.ref = "protobuf-gradle-plugin" module = "org.quartz-scheduler:quartz" version.ref = "quartz" +[libraries.reflections] +module = "org.reflections:reflections" +version.ref = "reflections" +exclusions = [ + "com.google.errorprone:error_prone_annotations", + "com.google.j2objc:j2objc-annotations", + "org.codehaus.mojo:animal-sniffer-annotations"] + [libraries.shadow-gradle-plugin] module = "gradle.plugin.com.github.johnrengelman:shadow" version.ref = "shadow-gradle-plugin" -# Don't update `shiro` version [libraries.shiro-core] module = "org.apache.shiro:shiro-core" version.ref = "shiro" @@ -425,9 +442,6 @@ version.ref = "testcontainers" module = "org.apache.thrift:libthrift" version.ref = "thrift09" -# Ensure that we use the same ZooKeeper version as what Curator depends on. -# See: https://github.com/apache/curator/blob/master/pom.xml -# (Switch to the right tag to find out the right version.) [libraries.zookeeper] module = "org.apache.zookeeper:zookeeper" version.ref = "zookeeper" diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index d64cd49177..e6441136f3 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index e7646dead0..66cd5a0e49 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.10-all.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/gradlew.bat b/gradlew.bat index 93e3f59f13..25da30dbde 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -43,11 +43,11 @@ set JAVA_EXE=java.exe %JAVA_EXE% -version >NUL 2>&1 if %ERRORLEVEL% equ 0 goto execute -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. +echo. 1>&2 +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 goto fail @@ -57,11 +57,11 @@ set JAVA_EXE=%JAVA_HOME%/bin/java.exe if exist "%JAVA_EXE%" goto execute -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. +echo. 1>&2 +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 goto fail diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/api/auth/ApplicationTokenAuthorizer.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/api/auth/ApplicationTokenAuthorizer.java index 8ef0381069..9aa981ec40 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/api/auth/ApplicationTokenAuthorizer.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/api/auth/ApplicationTokenAuthorizer.java @@ -31,12 +31,14 @@ import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.auth.OAuth2Token; +import com.linecorp.armeria.common.logging.LogLevel; import com.linecorp.armeria.common.util.Exceptions; import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.server.auth.AuthTokenExtractors; import com.linecorp.armeria.server.auth.Authorizer; import com.linecorp.centraldogma.internal.CsrfToken; import com.linecorp.centraldogma.server.internal.admin.auth.AuthUtil; +import com.linecorp.centraldogma.server.internal.admin.service.TokenNotFoundException; import com.linecorp.centraldogma.server.metadata.Token; import com.linecorp.centraldogma.server.metadata.Tokens; import com.linecorp.centraldogma.server.metadata.User; @@ -89,10 +91,15 @@ public CompletionStage authorize(ServiceRequestContext ctx, HttpRequest // Should be authorized by the next authorizer. .exceptionally(voidFunction(cause -> { cause = Exceptions.peel(cause); - if (!(cause instanceof IllegalArgumentException)) { - logger.warn("Application token authorization failed: token={}, addr={}", - token.accessToken(), ctx.clientAddress(), cause); + final LogLevel level; + if (cause instanceof IllegalArgumentException || + cause instanceof TokenNotFoundException) { + level = LogLevel.DEBUG; + } else { + level = LogLevel.WARN; } + level.log(logger, "Failed to authorize an application token: token={}, addr={}", + token.accessToken(), ctx.clientAddress(), cause); res.complete(false); })); diff --git a/settings.gradle b/settings.gradle index 3d90e116f5..30f63381bf 100644 --- a/settings.gradle +++ b/settings.gradle @@ -9,6 +9,7 @@ includeWithFlags ':bom', 'bom' includeWithFlags ':client:java', 'java', 'publish', 'relocate' includeWithFlags ':client:java-armeria', 'java', 'publish', 'relocate' includeWithFlags ':client:java-armeria-legacy', 'java', 'publish', 'relocate' +includeWithFlags ':client:java-armeria-xds', 'java', 'publish', 'relocate' includeWithFlags ':client:java-spring-boot2-autoconfigure', 'java', 'publish', 'relocate', 'no_aggregation' includeWithFlags ':client:java-spring-boot2-starter', 'java', 'publish', 'relocate', 'no_aggregation' includeWithFlags ':client:java-spring-boot3-autoconfigure', 'java17', 'publish', 'relocate' diff --git a/testing/common/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaRuleDelegate.java b/testing/common/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaRuleDelegate.java index 36fd7dcc34..4d0e9d2269 100644 --- a/testing/common/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaRuleDelegate.java +++ b/testing/common/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaRuleDelegate.java @@ -158,7 +158,7 @@ public final CompletableFuture startAsync(File dataDir) { throw new IOError(e); } - final String uri = "h2c://" + serverAddress.getHostString() + ':' + serverAddress.getPort(); + final String uri = "h2c://127.0.0.1:" + serverAddress.getPort(); final WebClientBuilder webClientBuilder = WebClient.builder(uri); if (accessToken != null) { webClientBuilder.auth(AuthToken.ofOAuth2(accessToken)); diff --git a/xds/build.gradle b/xds/build.gradle index ac6e3a1f7c..f34c1be7d3 100644 --- a/xds/build.gradle +++ b/xds/build.gradle @@ -2,8 +2,10 @@ dependencies { implementation project(':server') implementation libs.armeria.grpc + implementation libs.controlplane.api implementation libs.controlplane.cache implementation libs.controlplane.server + implementation libs.reflections testImplementation libs.armeria.junit5 testImplementation libs.armeria.xds diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlanePlugin.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlanePlugin.java index 1f6626bb16..04a5ec1ef8 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlanePlugin.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlanePlugin.java @@ -68,10 +68,6 @@ import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; -import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router; -import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; @@ -174,13 +170,7 @@ private void init0(PluginInitContext pluginInitContext) { .jsonMarshallerFactory( serviceDescriptor -> GrpcJsonMarshaller .builder() - //TODO(minwoox): Automate the registration of the extension messages. - .jsonMarshallerCustomizer(builder -> { - builder.register(HttpConnectionManager.getDefaultInstance()) - .register(Router.getDefaultInstance()) - .register(UpstreamTlsContext.getDefaultInstance()) - .register(DownstreamTlsContext.getDefaultInstance()); - }) + .jsonMarshallerCustomizer(XdsResourceManager::registerEnvoyExtension) .build(serviceDescriptor)) .enableHttpJsonTranscoding(true).build(); sb.service(xdsApplicationService, pluginInitContext.authService()); diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java index 8e6470ca4d..b2f426530a 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java @@ -21,13 +21,18 @@ import static java.util.Objects.requireNonNull; import java.io.IOException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.util.regex.Pattern; import org.curioswitch.common.protobuf.json.MessageMarshaller; +import org.reflections.Reflections; +import org.reflections.scanners.SubTypesScanner; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import com.google.protobuf.Empty; +import com.google.protobuf.GeneratedMessageV3; import com.google.protobuf.Message; import com.linecorp.centraldogma.common.Author; @@ -44,10 +49,7 @@ import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; -import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; import io.grpc.Status; import io.grpc.stub.StreamObserver; @@ -56,19 +58,35 @@ public final class XdsResourceManager { public static final String RESOURCE_ID_PATTERN_STRING = "[a-z](?:[a-z0-9-_/]*[a-z0-9])?"; public static final Pattern RESOURCE_ID_PATTERN = Pattern.compile('^' + RESOURCE_ID_PATTERN_STRING + '$'); - //TODO(minwoox): Automate the registration of the extension message types. public static final MessageMarshaller JSON_MESSAGE_MARSHALLER = - MessageMarshaller.builder().omittingInsignificantWhitespace(true) - .register(Listener.getDefaultInstance()) - .register(Cluster.getDefaultInstance()) - .register(ClusterLoadAssignment.getDefaultInstance()) - .register(Router.getDefaultInstance()) - // extensions - .register(RouteConfiguration.getDefaultInstance()) - .register(HttpConnectionManager.getDefaultInstance()) - .register(UpstreamTlsContext.getDefaultInstance()) - .register(DownstreamTlsContext.getDefaultInstance()) - .build(); + registerEnvoyExtension( + MessageMarshaller.builder().omittingInsignificantWhitespace(true) + .register(Listener.getDefaultInstance()) + .register(Cluster.getDefaultInstance()) + .register(ClusterLoadAssignment.getDefaultInstance()) + .register(RouteConfiguration.getDefaultInstance())) + .build(); + + public static MessageMarshaller.Builder registerEnvoyExtension(MessageMarshaller.Builder builder) { + final Reflections reflections = new Reflections( + "io.envoyproxy.envoy.extensions", HttpConnectionManager.class.getClassLoader(), + new SubTypesScanner(true)); + reflections.getSubTypesOf(GeneratedMessageV3.class) + .stream() + .filter(c -> !c.getName().contains("$")) // exclude subclasses + .filter(XdsResourceManager::hasGetDefaultInstanceMethod) + .forEach(builder::register); + return builder; + } + + private static boolean hasGetDefaultInstanceMethod(Class clazz) { + try { + final Method method = clazz.getMethod("getDefaultInstance"); + return method.getParameterCount() == 0 && Modifier.isStatic(method.getModifiers()); + } catch (NoSuchMethodException ignored) { + return false; + } + } public static String removePrefix(String prefix, String name) { if (!name.startsWith(prefix)) {