diff --git a/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java b/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java index 575c50ef..beb1ad9a 100644 --- a/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java +++ b/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java @@ -180,8 +180,12 @@ public boolean isNamespaceOwnerOfConnect(Namespace namespace, String connect) { */ public Mono> validateRemotely(Namespace namespace, Connector connector) { return kafkaConnectClient.validate(namespace.getMetadata().getCluster(), - connector.getSpec().getConnectCluster(), connector.getSpec().getConfig().get(CONNECTOR_CLASS), - ConnectorSpecs.builder().config(connector.getSpec().getConfig()).build()) + connector.getSpec().getConnectCluster(), + connector.getSpec().getConfig().get(CONNECTOR_CLASS), + ConnectorSpecs.builder() + .config(connector.getSpec().getConfig()) + .build() + ) .map(configInfos -> configInfos.configs() .stream() .filter(configInfo -> !configInfo.configValue().errors().isEmpty()) diff --git a/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java b/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java index 3f2a9818..3510e1ff 100644 --- a/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java +++ b/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java @@ -22,6 +22,8 @@ import jakarta.inject.Inject; import jakarta.inject.Singleton; import java.net.URI; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.Optional; @@ -60,8 +62,11 @@ public class KafkaConnectClient { */ public Mono> version(String kafkaCluster, String connectCluster) { KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster); - HttpRequest request = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), "/"))) + + HttpRequest request = HttpRequest.GET( + URI.create(StringUtils.prependUri(config.getUrl(), "/"))) .basicAuth(config.getUsername(), config.getPassword()); + return Mono.from(httpClient.exchange(request, ServerInfo.class)); } @@ -74,9 +79,12 @@ public Mono> version(String kafkaCluster, String connec */ public Mono> listAll(String kafkaCluster, String connectCluster) { KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster); + HttpRequest request = HttpRequest.GET( - URI.create(StringUtils.prependUri(config.getUrl(), "/connectors?expand=info&expand=status"))) + URI.create(StringUtils.prependUri(config.getUrl(), + "/connectors?expand=info&expand=status"))) .basicAuth(config.getUsername(), config.getPassword()); + return Mono.from(httpClient.retrieve(request, Argument.mapOf(String.class, ConnectorStatus.class))); } @@ -89,14 +97,19 @@ public Mono> listAll(String kafkaCluster, String co * @param connectorSpecs The connector config * @return The configuration infos */ - public Mono validate(String kafkaCluster, String connectCluster, String connectorClass, + public Mono validate(String kafkaCluster, + String connectCluster, + String connectorClass, ConnectorSpecs connectorSpecs) { KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster); - HttpRequest request = HttpRequest.PUT(URI.create( - StringUtils.prependUri(config.getUrl(), "/connector-plugins/" - + connectorClass + "/config/validate")), + String encodedConnectorClass = URLEncoder.encode(connectorClass, StandardCharsets.UTF_8); + + HttpRequest request = HttpRequest.PUT( + URI.create(StringUtils.prependUri(config.getUrl(), + "/connector-plugins/" + encodedConnectorClass + "/config/validate")), connectorSpecs) .basicAuth(config.getUsername(), config.getPassword()); + return Mono.from(httpClient.retrieve(request, ConfigInfos.class)); } @@ -109,13 +122,19 @@ public Mono validate(String kafkaCluster, String connectCluster, St * @param connectorSpecs The connector config * @return The creation or update response */ - public Mono createOrUpdate(String kafkaCluster, String connectCluster, String connector, + public Mono createOrUpdate(String kafkaCluster, + String connectCluster, + String connector, ConnectorSpecs connectorSpecs) { KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster); - HttpRequest request = - HttpRequest.PUT(URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + connector + "/config")), - connectorSpecs) - .basicAuth(config.getUsername(), config.getPassword()); + String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8); + + HttpRequest request = HttpRequest.PUT( + URI.create(StringUtils.prependUri(config.getUrl(), + CONNECTORS + encodedConnector + "/config")), + connectorSpecs) + .basicAuth(config.getUsername(), config.getPassword()); + return Mono.from(httpClient.retrieve(request, ConnectorInfo.class)); } @@ -129,9 +148,13 @@ public Mono createOrUpdate(String kafkaCluster, String connectClu */ public Mono> delete(String kafkaCluster, String connectCluster, String connector) { KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster); - HttpRequest request = - HttpRequest.DELETE(URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + connector))) - .basicAuth(config.getUsername(), config.getPassword()); + String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8); + + HttpRequest request = HttpRequest.DELETE( + URI.create(StringUtils.prependUri(config.getUrl(), + CONNECTORS + encodedConnector))) + .basicAuth(config.getUsername(), config.getPassword()); + return Mono.from(httpClient.exchange(request, Void.class)); } @@ -144,9 +167,12 @@ public Mono> delete(String kafkaCluster, String connectCluste */ public Mono> connectPlugins(String kafkaCluster, String connectCluster) { KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster); - HttpRequest request = - HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), "/connector-plugins"))) - .basicAuth(config.getUsername(), config.getPassword()); + + HttpRequest request = HttpRequest.GET( + URI.create(StringUtils.prependUri(config.getUrl(), + "/connector-plugins"))) + .basicAuth(config.getUsername(), config.getPassword()); + return Mono.from(httpClient.retrieve(request, Argument.listOf(ConnectorPluginInfo.class))); } @@ -160,9 +186,13 @@ public Mono> connectPlugins(String kafkaCluster, Strin */ public Mono status(String kafkaCluster, String connectCluster, String connector) { KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster); - HttpRequest request = - HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + connector + "/status"))) - .basicAuth(config.getUsername(), config.getPassword()); + String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8); + + HttpRequest request = HttpRequest.GET( + URI.create(StringUtils.prependUri(config.getUrl(), + CONNECTORS + encodedConnector + "/status"))) + .basicAuth(config.getUsername(), config.getPassword()); + return Mono.from(httpClient.retrieve(request, ConnectorStateInfo.class)); } @@ -177,10 +207,13 @@ public Mono status(String kafkaCluster, String connectCluste */ public Mono> restart(String kafkaCluster, String connectCluster, String connector, int taskId) { KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster); - HttpRequest request = HttpRequest.POST(URI.create( - StringUtils.prependUri(config.getUrl(), CONNECTORS + connector + "/tasks/" - + taskId + "/restart")), null) + String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8); + + HttpRequest request = HttpRequest.POST( + URI.create(StringUtils.prependUri(config.getUrl(), + CONNECTORS + encodedConnector + "/tasks/" + taskId + "/restart")), null) .basicAuth(config.getUsername(), config.getPassword()); + return Mono.from(httpClient.exchange(request, Void.class)); } @@ -194,10 +227,13 @@ public Mono> restart(String kafkaCluster, String connectClust */ public Mono> pause(String kafkaCluster, String connectCluster, String connector) { KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster); - HttpRequest request = - HttpRequest.PUT(URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + connector + "/pause")), - null) - .basicAuth(config.getUsername(), config.getPassword()); + String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8); + + HttpRequest request = HttpRequest.PUT( + URI.create(StringUtils.prependUri(config.getUrl(), + CONNECTORS + encodedConnector + "/pause")), null) + .basicAuth(config.getUsername(), config.getPassword()); + return Mono.from(httpClient.exchange(request, Void.class)); } @@ -211,10 +247,13 @@ public Mono> pause(String kafkaCluster, String connectCluster */ public Mono> resume(String kafkaCluster, String connectCluster, String connector) { KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster); - HttpRequest request = - HttpRequest.PUT(URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + connector + "/resume")), - null) - .basicAuth(config.getUsername(), config.getPassword()); + String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8); + + HttpRequest request = HttpRequest.PUT( + URI.create(StringUtils.prependUri(config.getUrl(), + CONNECTORS + encodedConnector + "/resume")), null) + .basicAuth(config.getUsername(), config.getPassword()); + return Mono.from(httpClient.exchange(request, Void.class)); } diff --git a/src/main/java/com/michelin/ns4kafka/service/executor/ConnectorAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/service/executor/ConnectorAsyncExecutor.java index a7dc9d56..4d0a9b51 100644 --- a/src/main/java/com/michelin/ns4kafka/service/executor/ConnectorAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/service/executor/ConnectorAsyncExecutor.java @@ -258,7 +258,8 @@ private boolean connectorsAreSame(Connector expected, Connector actual) { private Mono deployConnector(Connector connector) { return kafkaConnectClient.createOrUpdate(managedClusterProperties.getName(), connector.getSpec().getConnectCluster(), - connector.getMetadata().getName(), ConnectorSpecs.builder() + connector.getMetadata().getName(), + ConnectorSpecs.builder() .config(connector.getSpec().getConfig()) .build()) .doOnSuccess(