From 3eb7c78abadb683138bb2ff8f03c1c8270dc9804 Mon Sep 17 00:00:00 2001 From: minwoox Date: Wed, 14 Aug 2024 15:30:32 +0900 Subject: [PATCH] Address comments from @ikhoon --- .../xds/internal/ControlPlanePlugin.java | 8 ++--- .../xds/internal/ControlPlaneService.java | 2 +- .../xds/internal/XdsResourceManager.java | 12 +++---- .../internal/XdsResourceWatchingService.java | 2 +- .../XdsKubernetesEndpointFetchingPlugin.java | 6 ++++ .../XdsKubernetesEndpointFetchingService.java | 18 +++++----- .../xds/k8s/v1/XdsKubernetesService.java | 20 ++++++----- .../xds/k8s/v1/xds_kubernetes.proto | 18 +++++----- .../xds/k8s/v1/XdsKubernetesServiceTest.java | 34 +++++++++---------- 9 files changed, 65 insertions(+), 55 deletions(-) diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlanePlugin.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlanePlugin.java index 4b74ab90b0..e796a124fd 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlanePlugin.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlanePlugin.java @@ -35,12 +35,9 @@ public final class ControlPlanePlugin extends AllReplicasPlugin { @Nullable private volatile ControlPlaneService controlPlaneService; - public static final long BACKOFF_SECONDS = 30; - @Override public void init(PluginInitContext pluginInitContext) { final InternalProjectInitializer projectInitializer = pluginInitContext.internalProjectInitializer(); - pluginInitContext.commandExecutor(); projectInitializer.initialize(XDS_CENTRAL_DOGMA_PROJECT); final ControlPlaneService controlPlaneService = new ControlPlaneService( pluginInitContext.projectManager().get(XDS_CENTRAL_DOGMA_PROJECT), @@ -61,8 +58,9 @@ public CompletionStage start(PluginContext context) { @Override public CompletionStage stop(PluginContext context) { final ControlPlaneService controlPlaneService = this.controlPlaneService; - assert controlPlaneService != null; - controlPlaneService.stop(); + if (controlPlaneService != null) { + controlPlaneService.stop(); + } return UnmodifiableFuture.completedFuture(null); } diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneService.java index 541c7a602e..c29217b60f 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneService.java @@ -93,7 +93,7 @@ public final class ControlPlaneService extends XdsResourceWatchingService { Future start(PluginInitContext pluginInitContext) { return controlPlaneExecutor.submit(() -> { - start(); + init(); final CommandExecutor commandExecutor = pluginInitContext.commandExecutor(); final V3DiscoveryServer server = new V3DiscoveryServer(new LoggingDiscoveryServerCallbacks(), cache); diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java index 89b543eb8c..462069cf77 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java @@ -37,9 +37,9 @@ import com.linecorp.centraldogma.server.command.CommandExecutor; import com.linecorp.centraldogma.server.storage.project.Project; import com.linecorp.centraldogma.server.storage.repository.Repository; -import com.linecorp.centraldogma.xds.k8s.v1.CreateWatcherRequest; -import com.linecorp.centraldogma.xds.k8s.v1.DeleteWatcherRequest; -import com.linecorp.centraldogma.xds.k8s.v1.UpdateWatcherRequest; +import com.linecorp.centraldogma.xds.k8s.v1.CreateServiceEndpointWatcherRequest; +import com.linecorp.centraldogma.xds.k8s.v1.DeleteServiceEndpointWatcherRequest; +import com.linecorp.centraldogma.xds.k8s.v1.UpdateServiceEndpointWatcherRequest; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; @@ -64,9 +64,9 @@ public final class XdsResourceManager { .register(Cluster.getDefaultInstance()) .register(ClusterLoadAssignment.getDefaultInstance()) .register(RouteConfiguration.getDefaultInstance()) - .register(CreateWatcherRequest.getDefaultInstance()) - .register(UpdateWatcherRequest.getDefaultInstance()) - .register(DeleteWatcherRequest.getDefaultInstance()) + .register(CreateServiceEndpointWatcherRequest.getDefaultInstance()) + .register(UpdateServiceEndpointWatcherRequest.getDefaultInstance()) + .register(DeleteServiceEndpointWatcherRequest.getDefaultInstance()) // extensions .register(Router.getDefaultInstance()) .register(HttpConnectionManager.getDefaultInstance()) diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceWatchingService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceWatchingService.java index 22f8353975..35d9611e45 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceWatchingService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceWatchingService.java @@ -74,7 +74,7 @@ protected abstract void handleXdsResource(String path, String contentAsText, Str /** * Must be executed by {@link #executor()}. */ - protected void start() { + protected void init() { for (Repository repository : xdsProject.repos().list().values()) { final String groupName = repository.name(); if (Project.internalRepos().contains(groupName)) { diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingPlugin.java b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingPlugin.java index f0473969f5..7e27ccd635 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingPlugin.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingPlugin.java @@ -20,6 +20,7 @@ import static java.util.Objects.requireNonNull; import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -56,6 +57,11 @@ public synchronized CompletionStage start(PluginContext context) { fetchingService = new XdsKubernetesEndpointFetchingService( context.projectManager().get(XDS_CENTRAL_DOGMA_PROJECT), context.commandExecutor(), context.meterRegistry()); + try { + fetchingService.start().get(60, TimeUnit.SECONDS); + } catch (Throwable t) { + throw new RuntimeException("Failed to start control plane plugin in 60 seconds.", t); + } return UnmodifiableFuture.completedFuture(null); } diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingService.java index 18b66d4389..56573a0d47 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingService.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -65,7 +66,6 @@ final class XdsKubernetesEndpointFetchingService extends XdsResourceWatchingServ private static final Pattern WATCHERS_PATTERN = Pattern.compile("(?<=/k8s)/watchers/"); private final CommandExecutor commandExecutor; - private final MeterRegistry meterRegistry; // Only accessed by the executorService. private final Map> kubernetesWatchers = new HashMap<>(); @@ -77,14 +77,16 @@ final class XdsKubernetesEndpointFetchingService extends XdsResourceWatchingServ MeterRegistry meterRegistry) { super(xdsProject); this.commandExecutor = commandExecutor; - this.meterRegistry = meterRegistry; executorService = ExecutorServiceMetrics.monitor( meterRegistry, Executors.newSingleThreadScheduledExecutor( new DefaultThreadFactory("k8s-plugin-executor", true)), "k8sPluginExecutor"); - executorService.execute(this::start); } - synchronized void stop() { + Future start() { + return executorService.submit(this::init); + } + + void stop() { stopped = true; executorService.submit(() -> { kubernetesWatchers.values().forEach(map -> { @@ -108,7 +110,7 @@ protected String pathPattern() { @Override protected void handleXdsResource(String path, String contentAsText, String groupName) throws InvalidProtocolBufferException { - final Watcher.Builder watcherBuilder = Watcher.newBuilder(); + final ServiceEndpointWatcher.Builder watcherBuilder = ServiceEndpointWatcher.newBuilder(); try { JSON_MESSAGE_MARSHALLER.mergeValue(contentAsText, watcherBuilder); } catch (IOException e) { @@ -116,12 +118,12 @@ protected void handleXdsResource(String path, String contentAsText, String group groupName, path, contentAsText, e); return; } - final Watcher k8sWatcher = watcherBuilder.build(); - final KubernetesEndpointGroup kubernetesEndpointGroup = createKubernetesEndpointGroup(k8sWatcher); + final ServiceEndpointWatcher endpointWatcher = watcherBuilder.build(); + final KubernetesEndpointGroup kubernetesEndpointGroup = createKubernetesEndpointGroup(endpointWatcher); final Map watchers = kubernetesWatchers.computeIfAbsent(groupName, unused -> new HashMap<>()); - final String watcherName = k8sWatcher.getName(); + final String watcherName = endpointWatcher.getName(); final KubernetesEndpointGroup oldWatcher = watchers.get(watcherName); if (oldWatcher != null) { oldWatcher.closeAsync(); diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesService.java index d51132dfbb..be125595c3 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesService.java @@ -71,7 +71,8 @@ public XdsKubernetesService(XdsResourceManager xdsResourceManager) { @Blocking @Override - public void createWatcher(CreateWatcherRequest request, StreamObserver responseObserver) { + public void createServiceEndpointWatcher(CreateServiceEndpointWatcherRequest request, + StreamObserver responseObserver) { final String parent = request.getParent(); final String group = removePrefix("groups/", parent); xdsResourceManager.checkGroup(group); @@ -83,7 +84,7 @@ public void createWatcher(CreateWatcherRequest request, StreamObserver } final String watcherName = parent + K8S_WATCHERS_DIRECTORY + watcherId; - final Watcher watcher = request.getWatcher().toBuilder().setName(watcherName).build(); + final ServiceEndpointWatcher watcher = request.getWatcher().toBuilder().setName(watcherName).build(); final Author author = currentAuthor(); validateWatcherAndPush(responseObserver, watcher, () -> xdsResourceManager.push( responseObserver, group, K8S_WATCHERS_DIRECTORY + watcherId + ".json", @@ -91,7 +92,8 @@ public void createWatcher(CreateWatcherRequest request, StreamObserver } private static void validateWatcherAndPush( - StreamObserver responseObserver, Watcher watcher, Runnable onSuccess) { + StreamObserver responseObserver, + ServiceEndpointWatcher watcher, Runnable onSuccess) { // Create a KubernetesEndpointGroup to check if the watcher is valid. // We use KubernetesEndpointGroup for simplicity, but we will implement a custom implementation // for better debugging and error handling in the future. @@ -131,11 +133,11 @@ private static void validateWatcherAndPush( } /** - * Creates a {@link KubernetesEndpointGroup} from the specified {@link Watcher}. + * Creates a {@link KubernetesEndpointGroup} from the specified {@link ServiceEndpointWatcher}. * This method must be executed in a blocking thread because * {@link KubernetesEndpointGroupBuilder#build()} blocks the execution thread. */ - public static KubernetesEndpointGroup createKubernetesEndpointGroup(Watcher watcher) { + public static KubernetesEndpointGroup createKubernetesEndpointGroup(ServiceEndpointWatcher watcher) { final KubernetesConfig kubernetesConfig = watcher.getKubernetesConfig(); final String serviceName = watcher.getServiceName(); @@ -165,8 +167,9 @@ private static Config toConfig(KubernetesConfig kubernetesConfig) { @Blocking @Override - public void updateWatcher(UpdateWatcherRequest request, StreamObserver responseObserver) { - final Watcher watcher = request.getWatcher(); + public void updateServiceEndpointWatcher(UpdateServiceEndpointWatcherRequest request, + StreamObserver responseObserver) { + final ServiceEndpointWatcher watcher = request.getWatcher(); final String watcherName = watcher.getName(); final String group = checkWatcherName(watcherName).group(1); xdsResourceManager.checkGroup(group); @@ -186,7 +189,8 @@ private static Matcher checkWatcherName(String watcherName) { } @Override - public void deleteWatcher(DeleteWatcherRequest request, StreamObserver responseObserver) { + public void deleteServiceEndpointWatcher(DeleteServiceEndpointWatcherRequest request, + StreamObserver responseObserver) { final String watcherName = request.getName(); final String group = checkWatcherName(watcherName).group(1); xdsResourceManager.checkGroup(group); diff --git a/xds/src/main/proto/centraldogma/xds/k8s/v1/xds_kubernetes.proto b/xds/src/main/proto/centraldogma/xds/k8s/v1/xds_kubernetes.proto index 21aecbfbe4..aa7f7b4fcd 100644 --- a/xds/src/main/proto/centraldogma/xds/k8s/v1/xds_kubernetes.proto +++ b/xds/src/main/proto/centraldogma/xds/k8s/v1/xds_kubernetes.proto @@ -26,7 +26,7 @@ import "google/protobuf/empty.proto"; service XdsKubernetesService { - rpc CreateWatcher(CreateWatcherRequest) returns (Watcher) { + rpc CreateServiceEndpointWatcher(CreateServiceEndpointWatcherRequest) returns (ServiceEndpointWatcher) { option (google.api.http) = { post: "/api/v1/xds/{parent=groups/*}/k8s/watchers" body: "watcher" @@ -34,31 +34,31 @@ service XdsKubernetesService { option (google.api.method_signature) = "project,watcher"; } - rpc UpdateWatcher(UpdateWatcherRequest) returns (Watcher) { + rpc UpdateServiceEndpointWatcher(UpdateServiceEndpointWatcherRequest) returns (ServiceEndpointWatcher) { option (google.api.http) = { patch: "/api/v1/xds/{watcher.name=groups/*/k8s/watchers/**}" body: "watcher" }; } - rpc DeleteWatcher(DeleteWatcherRequest) returns (google.protobuf.Empty) { + rpc DeleteServiceEndpointWatcher(DeleteServiceEndpointWatcherRequest) returns (google.protobuf.Empty) { option (google.api.http) = { delete: "/api/v1/xds/{name=groups/*/k8s/watchers/**}" }; } } -message CreateWatcherRequest { +message CreateServiceEndpointWatcherRequest { // The parent resource where this watcher will be created. // Format: groups/{group} string parent = 1 [(google.api.field_behavior) = REQUIRED]; // Valid pattern is "^[a-z]([a-z0-9-/]*[a-z0-9])?$" string watcher_id = 2 [(google.api.field_behavior) = REQUIRED]; - Watcher watcher = 3 [(google.api.field_behavior) = REQUIRED]; + ServiceEndpointWatcher watcher = 3 [(google.api.field_behavior) = REQUIRED]; } -message UpdateWatcherRequest { - Watcher watcher = 1 [(google.api.field_behavior) = REQUIRED]; +message UpdateServiceEndpointWatcherRequest { + ServiceEndpointWatcher watcher = 1 [(google.api.field_behavior) = REQUIRED]; // TODO(minwoox): Implement these fields // google.protobuf.FieldMask update_mask = 2; @@ -66,7 +66,7 @@ message UpdateWatcherRequest { // bool allow_missing = 3; } -message DeleteWatcherRequest { +message DeleteServiceEndpointWatcherRequest { string name = 1 [(google.api.field_behavior) = REQUIRED]; } @@ -77,7 +77,7 @@ message KubernetesConfig { bool trust_certs = 4 [(google.api.field_behavior) = OPTIONAL]; } -message Watcher { +message ServiceEndpointWatcher { // The resource name of the watcher. // Format: groups/{group}/k8s/watchers/{watcher}_id string name = 1 [(google.api.field_behavior) = IDENTIFIER]; diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesServiceTest.java b/xds/src/test/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesServiceTest.java index 31c770f7f3..499fa010a4 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesServiceTest.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesServiceTest.java @@ -47,7 +47,6 @@ import com.linecorp.centraldogma.common.Query; import com.linecorp.centraldogma.common.Revision; import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; -import com.linecorp.centraldogma.xds.k8s.v1.Watcher.Builder; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; @@ -130,7 +129,7 @@ static void cleanupK8sResources() { @Test void invalidProperty() throws IOException { final String watcherId = "foo-cluster"; - final Watcher watcher = watcher(watcherId, "invalid-service-name"); + final ServiceEndpointWatcher watcher = watcher(watcherId, "invalid-service-name"); final AggregatedHttpResponse response = createWatcher(watcher, watcherId); assertThat(response.status()).isSameAs(HttpStatus.INTERNAL_SERVER_ERROR); assertThat(response.contentUtf8()).contains("Failed to retrieve k8s endpoints"); @@ -139,7 +138,7 @@ void invalidProperty() throws IOException { @Test void createWatcherRequest() throws IOException { final String watcherId = "foo-k8s-cluster/1"; - final Watcher watcher = watcher(watcherId); + final ServiceEndpointWatcher watcher = watcher(watcherId); final AggregatedHttpResponse response = createWatcher(watcher, watcherId); assertOk(response); final String json = response.contentUtf8(); @@ -154,25 +153,26 @@ void createWatcherRequest() throws IOException { checkEndpointsViaDiscoveryRequest(dogma.httpClient().uri(), loadAssignment, clusterName); } - private static Watcher watcher(String watcherId) { + private static ServiceEndpointWatcher watcher(String watcherId) { return watcher(watcherId, "nginx-service"); } - private static Watcher watcher(String watcherId, String serviceName) { + private static ServiceEndpointWatcher watcher(String watcherId, String serviceName) { final KubernetesConfig kubernetesConfig = KubernetesConfig.newBuilder() .setControlPlaneUrl(client.getMasterUrl().toString()) .setNamespace(client.getNamespace()) .setTrustCerts(true) .build(); - return Watcher.newBuilder() - .setName("groups/foo/k8s/watchers/" + watcherId) - .setServiceName(serviceName) - .setKubernetesConfig(kubernetesConfig) - .build(); + return ServiceEndpointWatcher.newBuilder() + .setName("groups/foo/k8s/watchers/" + watcherId) + .setServiceName(serviceName) + .setKubernetesConfig(kubernetesConfig) + .build(); } - private static AggregatedHttpResponse createWatcher(Watcher watcher, String watcherId) throws IOException { + private static AggregatedHttpResponse createWatcher( + ServiceEndpointWatcher watcher, String watcherId) throws IOException { final RequestHeaders headers = RequestHeaders.builder(HttpMethod.POST, "/api/v1/xds/groups/foo/k8s/watchers?watcher_id=" + watcherId) @@ -189,8 +189,8 @@ private static void assertOk(AggregatedHttpResponse response) { assertThat(response.headers().get("grpc-status")).isEqualTo("0"); } - private static void assertWatcher(String json, Watcher expected) throws IOException { - final Builder responseWatcherBuilder = Watcher.newBuilder(); + private static void assertWatcher(String json, ServiceEndpointWatcher expected) throws IOException { + final ServiceEndpointWatcher.Builder responseWatcherBuilder = ServiceEndpointWatcher.newBuilder(); JSON_MESSAGE_MARSHALLER.mergeValue(json, responseWatcherBuilder); assertThat(responseWatcherBuilder.build()).isEqualTo(expected); } @@ -209,14 +209,14 @@ private static ClusterLoadAssignment clusterLoadAssignment(String clusterName, i @Test void updateWatcher() throws IOException { final String watcherId = "foo-k8s-cluster/2"; - final Watcher watcher = watcher(watcherId); + final ServiceEndpointWatcher watcher = watcher(watcherId); AggregatedHttpResponse response = createWatcher(watcher, watcherId); assertOk(response); final String clusterName = "groups/foo/k8s/clusters/" + watcherId; final ClusterLoadAssignment loadAssignment = clusterLoadAssignment(clusterName, 30000); checkEndpointsViaDiscoveryRequest(dogma.httpClient().uri(), loadAssignment, clusterName); - final Watcher updatingWatcher = watcher.toBuilder().setPortName("https").build(); + final ServiceEndpointWatcher updatingWatcher = watcher.toBuilder().setPortName("https").build(); response = updateWatcher(updatingWatcher, watcherId, dogma.httpClient()); assertOk(response); @@ -226,7 +226,7 @@ void updateWatcher() throws IOException { } public static AggregatedHttpResponse updateWatcher( - Watcher watcher, String watcherId, WebClient webClient) throws IOException { + ServiceEndpointWatcher watcher, String watcherId, WebClient webClient) throws IOException { final RequestHeaders headers = RequestHeaders.builder(HttpMethod.PATCH, "/api/v1/xds/groups/foo/k8s/watchers/" + watcherId) @@ -239,7 +239,7 @@ public static AggregatedHttpResponse updateWatcher( @Test void deleteWatcher() throws IOException { final String watcherId = "foo-k8s-cluster/3"; - final Watcher watcher = watcher(watcherId); + final ServiceEndpointWatcher watcher = watcher(watcherId); AggregatedHttpResponse response = createWatcher(watcher, watcherId); assertOk(response); final String clusterName = "groups/foo/k8s/clusters/" + watcherId;