Skip to content

Commit

Permalink
Address comments from @ikhoon
Browse files Browse the repository at this point in the history
  • Loading branch information
minwoox committed Aug 14, 2024
1 parent 60c1888 commit 3eb7c78
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@ public final class ControlPlanePlugin extends AllReplicasPlugin {
@Nullable
private volatile ControlPlaneService controlPlaneService;

public static final long BACKOFF_SECONDS = 30;

@Override
public void init(PluginInitContext pluginInitContext) {
final InternalProjectInitializer projectInitializer = pluginInitContext.internalProjectInitializer();
pluginInitContext.commandExecutor();
projectInitializer.initialize(XDS_CENTRAL_DOGMA_PROJECT);
final ControlPlaneService controlPlaneService = new ControlPlaneService(
pluginInitContext.projectManager().get(XDS_CENTRAL_DOGMA_PROJECT),
Expand All @@ -61,8 +58,9 @@ public CompletionStage<Void> start(PluginContext context) {
@Override
public CompletionStage<Void> stop(PluginContext context) {
final ControlPlaneService controlPlaneService = this.controlPlaneService;
assert controlPlaneService != null;
controlPlaneService.stop();
if (controlPlaneService != null) {
controlPlaneService.stop();
}
return UnmodifiableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public final class ControlPlaneService extends XdsResourceWatchingService {

Future<Void> start(PluginInitContext pluginInitContext) {
return controlPlaneExecutor.submit(() -> {
start();
init();
final CommandExecutor commandExecutor = pluginInitContext.commandExecutor();
final V3DiscoveryServer server = new V3DiscoveryServer(new LoggingDiscoveryServerCallbacks(),
cache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
import com.linecorp.centraldogma.server.command.CommandExecutor;
import com.linecorp.centraldogma.server.storage.project.Project;
import com.linecorp.centraldogma.server.storage.repository.Repository;
import com.linecorp.centraldogma.xds.k8s.v1.CreateWatcherRequest;
import com.linecorp.centraldogma.xds.k8s.v1.DeleteWatcherRequest;
import com.linecorp.centraldogma.xds.k8s.v1.UpdateWatcherRequest;
import com.linecorp.centraldogma.xds.k8s.v1.CreateServiceEndpointWatcherRequest;
import com.linecorp.centraldogma.xds.k8s.v1.DeleteServiceEndpointWatcherRequest;
import com.linecorp.centraldogma.xds.k8s.v1.UpdateServiceEndpointWatcherRequest;

import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
Expand All @@ -64,9 +64,9 @@ public final class XdsResourceManager {
.register(Cluster.getDefaultInstance())
.register(ClusterLoadAssignment.getDefaultInstance())
.register(RouteConfiguration.getDefaultInstance())
.register(CreateWatcherRequest.getDefaultInstance())
.register(UpdateWatcherRequest.getDefaultInstance())
.register(DeleteWatcherRequest.getDefaultInstance())
.register(CreateServiceEndpointWatcherRequest.getDefaultInstance())
.register(UpdateServiceEndpointWatcherRequest.getDefaultInstance())
.register(DeleteServiceEndpointWatcherRequest.getDefaultInstance())
// extensions
.register(Router.getDefaultInstance())
.register(HttpConnectionManager.getDefaultInstance())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected abstract void handleXdsResource(String path, String contentAsText, Str
/**
* Must be executed by {@link #executor()}.
*/
protected void start() {
protected void init() {
for (Repository repository : xdsProject.repos().list().values()) {
final String groupName = repository.name();
if (Project.internalRepos().contains(groupName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.util.Objects.requireNonNull;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -56,6 +57,11 @@ public synchronized CompletionStage<Void> start(PluginContext context) {
fetchingService = new XdsKubernetesEndpointFetchingService(
context.projectManager().get(XDS_CENTRAL_DOGMA_PROJECT), context.commandExecutor(),
context.meterRegistry());
try {
fetchingService.start().get(60, TimeUnit.SECONDS);
} catch (Throwable t) {
throw new RuntimeException("Failed to start control plane plugin in 60 seconds.", t);
}
return UnmodifiableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -65,7 +66,6 @@ final class XdsKubernetesEndpointFetchingService extends XdsResourceWatchingServ
private static final Pattern WATCHERS_PATTERN = Pattern.compile("(?<=/k8s)/watchers/");

private final CommandExecutor commandExecutor;
private final MeterRegistry meterRegistry;

// Only accessed by the executorService.
private final Map<String, Map<String, KubernetesEndpointGroup>> kubernetesWatchers = new HashMap<>();
Expand All @@ -77,14 +77,16 @@ final class XdsKubernetesEndpointFetchingService extends XdsResourceWatchingServ
MeterRegistry meterRegistry) {
super(xdsProject);
this.commandExecutor = commandExecutor;
this.meterRegistry = meterRegistry;
executorService = ExecutorServiceMetrics.monitor(
meterRegistry, Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("k8s-plugin-executor", true)), "k8sPluginExecutor");
executorService.execute(this::start);
}

synchronized void stop() {
Future<?> start() {
return executorService.submit(this::init);
}

void stop() {
stopped = true;
executorService.submit(() -> {
kubernetesWatchers.values().forEach(map -> {
Expand All @@ -108,20 +110,20 @@ protected String pathPattern() {
@Override
protected void handleXdsResource(String path, String contentAsText, String groupName)
throws InvalidProtocolBufferException {
final Watcher.Builder watcherBuilder = Watcher.newBuilder();
final ServiceEndpointWatcher.Builder watcherBuilder = ServiceEndpointWatcher.newBuilder();
try {
JSON_MESSAGE_MARSHALLER.mergeValue(contentAsText, watcherBuilder);
} catch (IOException e) {
logger.warn("Failed to parse a Watcher at {}{}. content: {}",
groupName, path, contentAsText, e);
return;
}
final Watcher k8sWatcher = watcherBuilder.build();
final KubernetesEndpointGroup kubernetesEndpointGroup = createKubernetesEndpointGroup(k8sWatcher);
final ServiceEndpointWatcher endpointWatcher = watcherBuilder.build();
final KubernetesEndpointGroup kubernetesEndpointGroup = createKubernetesEndpointGroup(endpointWatcher);
final Map<String, KubernetesEndpointGroup> watchers =
kubernetesWatchers.computeIfAbsent(groupName, unused -> new HashMap<>());

final String watcherName = k8sWatcher.getName();
final String watcherName = endpointWatcher.getName();
final KubernetesEndpointGroup oldWatcher = watchers.get(watcherName);
if (oldWatcher != null) {
oldWatcher.closeAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public XdsKubernetesService(XdsResourceManager xdsResourceManager) {

@Blocking
@Override
public void createWatcher(CreateWatcherRequest request, StreamObserver<Watcher> responseObserver) {
public void createServiceEndpointWatcher(CreateServiceEndpointWatcherRequest request,
StreamObserver<ServiceEndpointWatcher> responseObserver) {
final String parent = request.getParent();
final String group = removePrefix("groups/", parent);
xdsResourceManager.checkGroup(group);
Expand All @@ -83,15 +84,16 @@ public void createWatcher(CreateWatcherRequest request, StreamObserver<Watcher>
}

final String watcherName = parent + K8S_WATCHERS_DIRECTORY + watcherId;
final Watcher watcher = request.getWatcher().toBuilder().setName(watcherName).build();
final ServiceEndpointWatcher watcher = request.getWatcher().toBuilder().setName(watcherName).build();
final Author author = currentAuthor();
validateWatcherAndPush(responseObserver, watcher, () -> xdsResourceManager.push(
responseObserver, group, K8S_WATCHERS_DIRECTORY + watcherId + ".json",
"Create watcher: " + watcherName, watcher, author));
}

private static void validateWatcherAndPush(
StreamObserver<Watcher> responseObserver, Watcher watcher, Runnable onSuccess) {
StreamObserver<ServiceEndpointWatcher> responseObserver,
ServiceEndpointWatcher watcher, Runnable onSuccess) {
// Create a KubernetesEndpointGroup to check if the watcher is valid.
// We use KubernetesEndpointGroup for simplicity, but we will implement a custom implementation
// for better debugging and error handling in the future.
Expand Down Expand Up @@ -131,11 +133,11 @@ private static void validateWatcherAndPush(
}

/**
* Creates a {@link KubernetesEndpointGroup} from the specified {@link Watcher}.
* Creates a {@link KubernetesEndpointGroup} from the specified {@link ServiceEndpointWatcher}.
* This method must be executed in a blocking thread because
* {@link KubernetesEndpointGroupBuilder#build()} blocks the execution thread.
*/
public static KubernetesEndpointGroup createKubernetesEndpointGroup(Watcher watcher) {
public static KubernetesEndpointGroup createKubernetesEndpointGroup(ServiceEndpointWatcher watcher) {
final KubernetesConfig kubernetesConfig = watcher.getKubernetesConfig();
final String serviceName = watcher.getServiceName();

Expand Down Expand Up @@ -165,8 +167,9 @@ private static Config toConfig(KubernetesConfig kubernetesConfig) {

@Blocking
@Override
public void updateWatcher(UpdateWatcherRequest request, StreamObserver<Watcher> responseObserver) {
final Watcher watcher = request.getWatcher();
public void updateServiceEndpointWatcher(UpdateServiceEndpointWatcherRequest request,
StreamObserver<ServiceEndpointWatcher> responseObserver) {
final ServiceEndpointWatcher watcher = request.getWatcher();
final String watcherName = watcher.getName();
final String group = checkWatcherName(watcherName).group(1);
xdsResourceManager.checkGroup(group);
Expand All @@ -186,7 +189,8 @@ private static Matcher checkWatcherName(String watcherName) {
}

@Override
public void deleteWatcher(DeleteWatcherRequest request, StreamObserver<Empty> responseObserver) {
public void deleteServiceEndpointWatcher(DeleteServiceEndpointWatcherRequest request,
StreamObserver<Empty> responseObserver) {
final String watcherName = request.getName();
final String group = checkWatcherName(watcherName).group(1);
xdsResourceManager.checkGroup(group);
Expand Down
18 changes: 9 additions & 9 deletions xds/src/main/proto/centraldogma/xds/k8s/v1/xds_kubernetes.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,47 +26,47 @@ import "google/protobuf/empty.proto";

service XdsKubernetesService {

rpc CreateWatcher(CreateWatcherRequest) returns (Watcher) {
rpc CreateServiceEndpointWatcher(CreateServiceEndpointWatcherRequest) returns (ServiceEndpointWatcher) {
option (google.api.http) = {
post: "/api/v1/xds/{parent=groups/*}/k8s/watchers"
body: "watcher"
};
option (google.api.method_signature) = "project,watcher";
}

rpc UpdateWatcher(UpdateWatcherRequest) returns (Watcher) {
rpc UpdateServiceEndpointWatcher(UpdateServiceEndpointWatcherRequest) returns (ServiceEndpointWatcher) {
option (google.api.http) = {
patch: "/api/v1/xds/{watcher.name=groups/*/k8s/watchers/**}"
body: "watcher"
};
}

rpc DeleteWatcher(DeleteWatcherRequest) returns (google.protobuf.Empty) {
rpc DeleteServiceEndpointWatcher(DeleteServiceEndpointWatcherRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
delete: "/api/v1/xds/{name=groups/*/k8s/watchers/**}"
};
}
}

message CreateWatcherRequest {
message CreateServiceEndpointWatcherRequest {
// The parent resource where this watcher will be created.
// Format: groups/{group}
string parent = 1 [(google.api.field_behavior) = REQUIRED];
// Valid pattern is "^[a-z]([a-z0-9-/]*[a-z0-9])?$"
string watcher_id = 2 [(google.api.field_behavior) = REQUIRED];
Watcher watcher = 3 [(google.api.field_behavior) = REQUIRED];
ServiceEndpointWatcher watcher = 3 [(google.api.field_behavior) = REQUIRED];
}

message UpdateWatcherRequest {
Watcher watcher = 1 [(google.api.field_behavior) = REQUIRED];
message UpdateServiceEndpointWatcherRequest {
ServiceEndpointWatcher watcher = 1 [(google.api.field_behavior) = REQUIRED];

// TODO(minwoox): Implement these fields
// google.protobuf.FieldMask update_mask = 2;
//
// bool allow_missing = 3;
}

message DeleteWatcherRequest {
message DeleteServiceEndpointWatcherRequest {
string name = 1 [(google.api.field_behavior) = REQUIRED];
}

Expand All @@ -77,7 +77,7 @@ message KubernetesConfig {
bool trust_certs = 4 [(google.api.field_behavior) = OPTIONAL];
}

message Watcher {
message ServiceEndpointWatcher {
// The resource name of the watcher.
// Format: groups/{group}/k8s/watchers/{watcher}_id
string name = 1 [(google.api.field_behavior) = IDENTIFIER];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import com.linecorp.centraldogma.common.Query;
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension;
import com.linecorp.centraldogma.xds.k8s.v1.Watcher.Builder;

import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints;
Expand Down Expand Up @@ -130,7 +129,7 @@ static void cleanupK8sResources() {
@Test
void invalidProperty() throws IOException {
final String watcherId = "foo-cluster";
final Watcher watcher = watcher(watcherId, "invalid-service-name");
final ServiceEndpointWatcher watcher = watcher(watcherId, "invalid-service-name");
final AggregatedHttpResponse response = createWatcher(watcher, watcherId);
assertThat(response.status()).isSameAs(HttpStatus.INTERNAL_SERVER_ERROR);
assertThat(response.contentUtf8()).contains("Failed to retrieve k8s endpoints");
Expand All @@ -139,7 +138,7 @@ void invalidProperty() throws IOException {
@Test
void createWatcherRequest() throws IOException {
final String watcherId = "foo-k8s-cluster/1";
final Watcher watcher = watcher(watcherId);
final ServiceEndpointWatcher watcher = watcher(watcherId);
final AggregatedHttpResponse response = createWatcher(watcher, watcherId);
assertOk(response);
final String json = response.contentUtf8();
Expand All @@ -154,25 +153,26 @@ void createWatcherRequest() throws IOException {
checkEndpointsViaDiscoveryRequest(dogma.httpClient().uri(), loadAssignment, clusterName);
}

private static Watcher watcher(String watcherId) {
private static ServiceEndpointWatcher watcher(String watcherId) {
return watcher(watcherId, "nginx-service");
}

private static Watcher watcher(String watcherId, String serviceName) {
private static ServiceEndpointWatcher watcher(String watcherId, String serviceName) {
final KubernetesConfig kubernetesConfig =
KubernetesConfig.newBuilder()
.setControlPlaneUrl(client.getMasterUrl().toString())
.setNamespace(client.getNamespace())
.setTrustCerts(true)
.build();
return Watcher.newBuilder()
.setName("groups/foo/k8s/watchers/" + watcherId)
.setServiceName(serviceName)
.setKubernetesConfig(kubernetesConfig)
.build();
return ServiceEndpointWatcher.newBuilder()
.setName("groups/foo/k8s/watchers/" + watcherId)
.setServiceName(serviceName)
.setKubernetesConfig(kubernetesConfig)
.build();
}

private static AggregatedHttpResponse createWatcher(Watcher watcher, String watcherId) throws IOException {
private static AggregatedHttpResponse createWatcher(
ServiceEndpointWatcher watcher, String watcherId) throws IOException {
final RequestHeaders headers =
RequestHeaders.builder(HttpMethod.POST,
"/api/v1/xds/groups/foo/k8s/watchers?watcher_id=" + watcherId)
Expand All @@ -189,8 +189,8 @@ private static void assertOk(AggregatedHttpResponse response) {
assertThat(response.headers().get("grpc-status")).isEqualTo("0");
}

private static void assertWatcher(String json, Watcher expected) throws IOException {
final Builder responseWatcherBuilder = Watcher.newBuilder();
private static void assertWatcher(String json, ServiceEndpointWatcher expected) throws IOException {
final ServiceEndpointWatcher.Builder responseWatcherBuilder = ServiceEndpointWatcher.newBuilder();
JSON_MESSAGE_MARSHALLER.mergeValue(json, responseWatcherBuilder);
assertThat(responseWatcherBuilder.build()).isEqualTo(expected);
}
Expand All @@ -209,14 +209,14 @@ private static ClusterLoadAssignment clusterLoadAssignment(String clusterName, i
@Test
void updateWatcher() throws IOException {
final String watcherId = "foo-k8s-cluster/2";
final Watcher watcher = watcher(watcherId);
final ServiceEndpointWatcher watcher = watcher(watcherId);
AggregatedHttpResponse response = createWatcher(watcher, watcherId);
assertOk(response);
final String clusterName = "groups/foo/k8s/clusters/" + watcherId;
final ClusterLoadAssignment loadAssignment = clusterLoadAssignment(clusterName, 30000);
checkEndpointsViaDiscoveryRequest(dogma.httpClient().uri(), loadAssignment, clusterName);

final Watcher updatingWatcher = watcher.toBuilder().setPortName("https").build();
final ServiceEndpointWatcher updatingWatcher = watcher.toBuilder().setPortName("https").build();
response = updateWatcher(updatingWatcher, watcherId, dogma.httpClient());

assertOk(response);
Expand All @@ -226,7 +226,7 @@ void updateWatcher() throws IOException {
}

public static AggregatedHttpResponse updateWatcher(
Watcher watcher, String watcherId, WebClient webClient) throws IOException {
ServiceEndpointWatcher watcher, String watcherId, WebClient webClient) throws IOException {
final RequestHeaders headers =
RequestHeaders.builder(HttpMethod.PATCH,
"/api/v1/xds/groups/foo/k8s/watchers/" + watcherId)
Expand All @@ -239,7 +239,7 @@ public static AggregatedHttpResponse updateWatcher(
@Test
void deleteWatcher() throws IOException {
final String watcherId = "foo-k8s-cluster/3";
final Watcher watcher = watcher(watcherId);
final ServiceEndpointWatcher watcher = watcher(watcherId);
AggregatedHttpResponse response = createWatcher(watcher, watcherId);
assertOk(response);
final String clusterName = "groups/foo/k8s/clusters/" + watcherId;
Expand Down

0 comments on commit 3eb7c78

Please sign in to comment.