Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SSRF on Kafka Connect client #474

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,12 @@ public boolean isNamespaceOwnerOfConnect(Namespace namespace, String connect) {
*/
public Mono<List<String>> validateRemotely(Namespace namespace, Connector connector) {
return kafkaConnectClient.validate(namespace.getMetadata().getCluster(),
connector.getSpec().getConnectCluster(), connector.getSpec().getConfig().get(CONNECTOR_CLASS),
ConnectorSpecs.builder().config(connector.getSpec().getConfig()).build())
connector.getSpec().getConnectCluster(),
connector.getSpec().getConfig().get(CONNECTOR_CLASS),
ConnectorSpecs.builder()
.config(connector.getSpec().getConfig())
.build()
)
.map(configInfos -> configInfos.configs()
.stream()
.filter(configInfo -> !configInfo.configValue().errors().isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -60,8 +62,11 @@ public class KafkaConnectClient {
*/
public Mono<HttpResponse<ServerInfo>> version(String kafkaCluster, String connectCluster) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), "/")))

HttpRequest<?> request = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(), "/")))
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.exchange(request, ServerInfo.class));
}

Expand All @@ -74,9 +79,12 @@ public Mono<HttpResponse<ServerInfo>> version(String kafkaCluster, String connec
*/
public Mono<Map<String, ConnectorStatus>> listAll(String kafkaCluster, String connectCluster) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);

HttpRequest<?> request = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(), "/connectors?expand=info&expand=status")))
URI.create(StringUtils.prependUri(config.getUrl(),
"/connectors?expand=info&expand=status")))
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.retrieve(request, Argument.mapOf(String.class, ConnectorStatus.class)));
}

Expand All @@ -89,14 +97,19 @@ public Mono<Map<String, ConnectorStatus>> listAll(String kafkaCluster, String co
* @param connectorSpecs The connector config
* @return The configuration infos
*/
public Mono<ConfigInfos> validate(String kafkaCluster, String connectCluster, String connectorClass,
public Mono<ConfigInfos> validate(String kafkaCluster,
String connectCluster,
String connectorClass,
ConnectorSpecs connectorSpecs) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request = HttpRequest.PUT(URI.create(
StringUtils.prependUri(config.getUrl(), "/connector-plugins/"
+ connectorClass + "/config/validate")),
String encodedConnectorClass = URLEncoder.encode(connectorClass, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.PUT(
URI.create(StringUtils.prependUri(config.getUrl(),
"/connector-plugins/" + encodedConnectorClass + "/config/validate")),
connectorSpecs)
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.retrieve(request, ConfigInfos.class));
}

Expand All @@ -109,13 +122,19 @@ public Mono<ConfigInfos> validate(String kafkaCluster, String connectCluster, St
* @param connectorSpecs The connector config
* @return The creation or update response
*/
public Mono<ConnectorInfo> createOrUpdate(String kafkaCluster, String connectCluster, String connector,
public Mono<ConnectorInfo> createOrUpdate(String kafkaCluster,
String connectCluster,
String connector,
ConnectorSpecs connectorSpecs) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request =
HttpRequest.PUT(URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + connector + "/config")),
connectorSpecs)
.basicAuth(config.getUsername(), config.getPassword());
String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.PUT(
URI.create(StringUtils.prependUri(config.getUrl(),
CONNECTORS + encodedConnector + "/config")),
connectorSpecs)
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.retrieve(request, ConnectorInfo.class));
}

Expand All @@ -129,9 +148,13 @@ public Mono<ConnectorInfo> createOrUpdate(String kafkaCluster, String connectClu
*/
public Mono<HttpResponse<Void>> delete(String kafkaCluster, String connectCluster, String connector) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request =
HttpRequest.DELETE(URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + connector)))
.basicAuth(config.getUsername(), config.getPassword());
String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.DELETE(
URI.create(StringUtils.prependUri(config.getUrl(),
CONNECTORS + encodedConnector)))
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.exchange(request, Void.class));
}

Expand All @@ -144,9 +167,12 @@ public Mono<HttpResponse<Void>> delete(String kafkaCluster, String connectCluste
*/
public Mono<List<ConnectorPluginInfo>> connectPlugins(String kafkaCluster, String connectCluster) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request =
HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), "/connector-plugins")))
.basicAuth(config.getUsername(), config.getPassword());

HttpRequest<?> request = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(),
"/connector-plugins")))
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.retrieve(request, Argument.listOf(ConnectorPluginInfo.class)));
}

Expand All @@ -160,9 +186,13 @@ public Mono<List<ConnectorPluginInfo>> connectPlugins(String kafkaCluster, Strin
*/
public Mono<ConnectorStateInfo> status(String kafkaCluster, String connectCluster, String connector) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request =
HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + connector + "/status")))
.basicAuth(config.getUsername(), config.getPassword());
String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(),
CONNECTORS + encodedConnector + "/status")))
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.retrieve(request, ConnectorStateInfo.class));
}

Expand All @@ -177,10 +207,13 @@ public Mono<ConnectorStateInfo> status(String kafkaCluster, String connectCluste
*/
public Mono<HttpResponse<Void>> restart(String kafkaCluster, String connectCluster, String connector, int taskId) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request = HttpRequest.POST(URI.create(
StringUtils.prependUri(config.getUrl(), CONNECTORS + connector + "/tasks/"
+ taskId + "/restart")), null)
String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.POST(
URI.create(StringUtils.prependUri(config.getUrl(),
CONNECTORS + encodedConnector + "/tasks/" + taskId + "/restart")), null)
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.exchange(request, Void.class));
}

Expand All @@ -194,10 +227,13 @@ public Mono<HttpResponse<Void>> restart(String kafkaCluster, String connectClust
*/
public Mono<HttpResponse<Void>> pause(String kafkaCluster, String connectCluster, String connector) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request =
HttpRequest.PUT(URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + connector + "/pause")),
null)
.basicAuth(config.getUsername(), config.getPassword());
String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.PUT(
URI.create(StringUtils.prependUri(config.getUrl(),
CONNECTORS + encodedConnector + "/pause")), null)
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.exchange(request, Void.class));
}

Expand All @@ -211,10 +247,13 @@ public Mono<HttpResponse<Void>> pause(String kafkaCluster, String connectCluster
*/
public Mono<HttpResponse<Void>> resume(String kafkaCluster, String connectCluster, String connector) {
KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster);
HttpRequest<?> request =
HttpRequest.PUT(URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + connector + "/resume")),
null)
.basicAuth(config.getUsername(), config.getPassword());
String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.PUT(
URI.create(StringUtils.prependUri(config.getUrl(),
CONNECTORS + encodedConnector + "/resume")), null)
.basicAuth(config.getUsername(), config.getPassword());

return Mono.from(httpClient.exchange(request, Void.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ private boolean connectorsAreSame(Connector expected, Connector actual) {
private Mono<ConnectorInfo> deployConnector(Connector connector) {
return kafkaConnectClient.createOrUpdate(managedClusterProperties.getName(),
connector.getSpec().getConnectCluster(),
connector.getMetadata().getName(), ConnectorSpecs.builder()
connector.getMetadata().getName(),
ConnectorSpecs.builder()
.config(connector.getSpec().getConfig())
.build())
.doOnSuccess(
Expand Down