Skip to content

Commit

Permalink
Fix SSRF on Kafka Connect client (#474)
Browse files Browse the repository at this point in the history
  • Loading branch information
loicgreffier authored Oct 23, 2024
1 parent 1d844e5 commit deef285
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,12 @@ public boolean isNamespaceOwnerOfConnect(Namespace namespace, String connect) {
*/
public Mono<List<String>> validateRemotely(Namespace namespace, Connector connector) {
return kafkaConnectClient.validate(namespace.getMetadata().getCluster(),
connector.getSpec().getConnectCluster(), connector.getSpec().getConfig().get(CONNECTOR_CLASS),
ConnectorSpecs.builder().config(connector.getSpec().getConfig()).build())
connector.getSpec().getConnectCluster(),
connector.getSpec().getConfig().get(CONNECTOR_CLASS),
ConnectorSpecs.builder()
.config(connector.getSpec().getConfig())
.build()
)
.map(configInfos -> configInfos.configs()
.stream()
.filter(configInfo -> !configInfo.configValue().errors().isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -60,8 +62,11 @@ public class KafkaConnectClient {
*/
public Mono<HttpResponse<ServerInfo>> version(String kafkaCluster, String connectCluster) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), "/")))

HttpRequest<?> request = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(), "/")))
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.exchange(request, ServerInfo.class));
}

Expand All @@ -74,9 +79,12 @@ public Mono<HttpResponse<ServerInfo>> version(String kafkaCluster, String connec
*/
public Mono<Map<String, ConnectorStatus>> listAll(String kafkaCluster, String connectCluster) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);

HttpRequest<?> request = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(), "/connectors?expand=info&expand=status")))
URI.create(StringUtils.prependUri(config.getUrl(),
"/connectors?expand=info&expand=status")))
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.retrieve(request, Argument.mapOf(String.class, ConnectorStatus.class)));
}

Expand All @@ -89,14 +97,19 @@ public Mono<Map<String, ConnectorStatus>> listAll(String kafkaCluster, String co
* @param connectorSpecs The connector config
* @return The configuration infos
*/
public Mono<ConfigInfos> validate(String kafkaCluster, String connectCluster, String connectorClass,
public Mono<ConfigInfos> validate(String kafkaCluster,
String connectCluster,
String connectorClass,
ConnectorSpecs connectorSpecs) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request = HttpRequest.PUT(URI.create(
StringUtils.prependUri(config.getUrl(), "/connector-plugins/"
+ connectorClass + "/config/validate")),
String encodedConnectorClass = URLEncoder.encode(connectorClass, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.PUT(
URI.create(StringUtils.prependUri(config.getUrl(),
"/connector-plugins/" + encodedConnectorClass + "/config/validate")),
connectorSpecs)
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.retrieve(request, ConfigInfos.class));
}

Expand All @@ -109,13 +122,19 @@ public Mono<ConfigInfos> validate(String kafkaCluster, String connectCluster, St
* @param connectorSpecs The connector config
* @return The creation or update response
*/
public Mono<ConnectorInfo> createOrUpdate(String kafkaCluster, String connectCluster, String connector,
public Mono<ConnectorInfo> createOrUpdate(String kafkaCluster,
String connectCluster,
String connector,
ConnectorSpecs connectorSpecs) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request =
HttpRequest.PUT(URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + connector + "/config")),
connectorSpecs)
.basicAuth(config.getUsername(), config.getPassword());
String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.PUT(
URI.create(StringUtils.prependUri(config.getUrl(),
CONNECTORS + encodedConnector + "/config")),
connectorSpecs)
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.retrieve(request, ConnectorInfo.class));
}

Expand All @@ -129,9 +148,13 @@ public Mono<ConnectorInfo> createOrUpdate(String kafkaCluster, String connectClu
*/
public Mono<HttpResponse<Void>> delete(String kafkaCluster, String connectCluster, String connector) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request =
HttpRequest.DELETE(URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + connector)))
.basicAuth(config.getUsername(), config.getPassword());
String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.DELETE(
URI.create(StringUtils.prependUri(config.getUrl(),
CONNECTORS + encodedConnector)))
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.exchange(request, Void.class));
}

Expand All @@ -144,9 +167,12 @@ public Mono<HttpResponse<Void>> delete(String kafkaCluster, String connectCluste
*/
public Mono<List<ConnectorPluginInfo>> connectPlugins(String kafkaCluster, String connectCluster) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request =
HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), "/connector-plugins")))
.basicAuth(config.getUsername(), config.getPassword());

HttpRequest<?> request = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(),
"/connector-plugins")))
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.retrieve(request, Argument.listOf(ConnectorPluginInfo.class)));
}

Expand All @@ -160,9 +186,13 @@ public Mono<List<ConnectorPluginInfo>> connectPlugins(String kafkaCluster, Strin
*/
public Mono<ConnectorStateInfo> status(String kafkaCluster, String connectCluster, String connector) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request =
HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + connector + "/status")))
.basicAuth(config.getUsername(), config.getPassword());
String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(),
CONNECTORS + encodedConnector + "/status")))
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.retrieve(request, ConnectorStateInfo.class));
}

Expand All @@ -177,10 +207,13 @@ public Mono<ConnectorStateInfo> status(String kafkaCluster, String connectCluste
*/
public Mono<HttpResponse<Void>> restart(String kafkaCluster, String connectCluster, String connector, int taskId) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request = HttpRequest.POST(URI.create(
StringUtils.prependUri(config.getUrl(), CONNECTORS + connector + "/tasks/"
+ taskId + "/restart")), null)
String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.POST(
URI.create(StringUtils.prependUri(config.getUrl(),
CONNECTORS + encodedConnector + "/tasks/" + taskId + "/restart")), null)
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.exchange(request, Void.class));
}

Expand All @@ -194,10 +227,13 @@ public Mono<HttpResponse<Void>> restart(String kafkaCluster, String connectClust
*/
public Mono<HttpResponse<Void>> pause(String kafkaCluster, String connectCluster, String connector) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request =
HttpRequest.PUT(URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + connector + "/pause")),
null)
.basicAuth(config.getUsername(), config.getPassword());
String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.PUT(
URI.create(StringUtils.prependUri(config.getUrl(),
CONNECTORS + encodedConnector + "/pause")), null)
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.exchange(request, Void.class));
}

Expand All @@ -211,10 +247,13 @@ public Mono<HttpResponse<Void>> pause(String kafkaCluster, String connectCluster
*/
public Mono<HttpResponse<Void>> resume(String kafkaCluster, String connectCluster, String connector) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request =
HttpRequest.PUT(URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + connector + "/resume")),
null)
.basicAuth(config.getUsername(), config.getPassword());
String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.PUT(
URI.create(StringUtils.prependUri(config.getUrl(),
CONNECTORS + encodedConnector + "/resume")), null)
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.exchange(request, Void.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ private boolean connectorsAreSame(Connector expected, Connector actual) {
private Mono<ConnectorInfo> deployConnector(Connector connector) {
return kafkaConnectClient.createOrUpdate(managedClusterProperties.getName(),
connector.getSpec().getConnectCluster(),
connector.getMetadata().getName(), ConnectorSpecs.builder()
connector.getMetadata().getName(),
ConnectorSpecs.builder()
.config(connector.getSpec().getConfig())
.build())
.doOnSuccess(
Expand Down

0 comments on commit deef285

Please sign in to comment.