Skip to content

Commit

Permalink
Fix SSRF on schema registry client (#468)
Browse files Browse the repository at this point in the history
* Attempt to fixing ssrf

* Attempt fixing SSRF on schema registry client
  • Loading branch information
loicgreffier authored Oct 15, 2024
1 parent a6e6c18 commit 8a3b4f1
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,10 @@ private Mono<List<String>> validateReferences(Namespace ns, Schema schema) {
*/
public Mono<Integer> register(Namespace namespace, Schema schema) {
return schemaRegistryClient
.register(namespace.getMetadata().getCluster(),
schema.getMetadata().getName(), SchemaRequest.builder()
.register(
namespace.getMetadata().getCluster(),
schema.getMetadata().getName(),
SchemaRequest.builder()
.schemaType(String.valueOf(schema.getSpec().getSchemaType()))
.schema(schema.getSpec().getSchema())
.references(schema.getSpec().getReferences())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -71,9 +73,14 @@ public Flux<String> getSubjects(String kafkaCluster) {
*/
public Mono<SchemaResponse> getSubject(String kafkaCluster, String subject, String version) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);
String encodedVersion = URLEncoder.encode(version, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + VERSIONS + version)))
URI.create(StringUtils.prependUri(config.getUrl(),
SUBJECTS + encodedSubject + VERSIONS + encodedVersion)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, SchemaResponse.class))
.onErrorResume(HttpClientResponseException.class,
ex -> ex.getStatus().equals(HttpStatus.NOT_FOUND) ? Mono.empty() : Mono.error(ex));
Expand All @@ -88,15 +95,18 @@ public Mono<SchemaResponse> getSubject(String kafkaCluster, String subject, Stri
*/
public Flux<SchemaResponse> getAllSubjectVersions(String kafkaCluster, String subject) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + "/versions")))
URI.create(StringUtils.prependUri(config.getUrl(),
SUBJECTS + encodedSubject + "/versions")))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Flux.from(httpClient.retrieve(request, Integer[].class))
.flatMap(ids -> Flux.fromIterable(Arrays.asList(ids))
.flatMap(id -> {
HttpRequest<?> requestVersion = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + VERSIONS + id)))
HttpRequest<?> requestVersion = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(),
SUBJECTS + encodedSubject + VERSIONS + id)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return httpClient.retrieve(requestVersion, SchemaResponse.class);
Expand All @@ -115,10 +125,13 @@ public Flux<SchemaResponse> getAllSubjectVersions(String kafkaCluster, String su
*/
public Mono<SchemaResponse> register(String kafkaCluster, String subject, SchemaRequest body) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request =
HttpRequest.POST(URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + "/versions")),
body)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.POST(
URI.create(StringUtils.prependUri(config.getUrl(),
SUBJECTS + encodedSubject + "/versions")), body)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, SchemaResponse.class));
}

Expand All @@ -132,9 +145,13 @@ public Mono<SchemaResponse> register(String kafkaCluster, String subject, Schema
*/
public Mono<Integer[]> deleteSubject(String kafkaCluster, String subject, boolean hardDelete) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);

MutableHttpRequest<?> request = HttpRequest.DELETE(
URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + "?permanent=" + hardDelete)))
URI.create(StringUtils.prependUri(config.getUrl(),
SUBJECTS + encodedSubject + "?permanent=" + hardDelete)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, Integer[].class));
}

Expand All @@ -147,13 +164,16 @@ public Mono<Integer[]> deleteSubject(String kafkaCluster, String subject, boolea
* @param hardDelete Should the subject be hard deleted or not
* @return The version of the deleted subject
*/
public Mono<Integer> deleteSubjectVersion(String kafkaCluster, String subject, String version,
boolean hardDelete) {
public Mono<Integer> deleteSubjectVersion(String kafkaCluster, String subject, String version, boolean hardDelete) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);
String encodedVersion = URLEncoder.encode(version, StandardCharsets.UTF_8);

MutableHttpRequest<?> request = HttpRequest.DELETE(
URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + VERSIONS + version
+ "?permanent=" + hardDelete)))
URI.create(StringUtils.prependUri(config.getUrl(),
SUBJECTS + encodedSubject + VERSIONS + encodedVersion + "?permanent=" + hardDelete)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, Integer.class));
}

Expand All @@ -165,14 +185,17 @@ public Mono<Integer> deleteSubjectVersion(String kafkaCluster, String subject, S
* @param body The request
* @return The schema compatibility validation
*/
public Mono<SchemaCompatibilityCheckResponse> validateSchemaCompatibility(String kafkaCluster, String subject,
public Mono<SchemaCompatibilityCheckResponse> validateSchemaCompatibility(String kafkaCluster,
String subject,
SchemaRequest body) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.POST(URI.create(
StringUtils.prependUri(config.getUrl(), "/compatibility/subjects/" + subject
+ "/versions?verbose=true")),
body)
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.POST(
URI.create(StringUtils.prependUri(config.getUrl(),
"/compatibility/subjects/" + encodedSubject + "/versions?verbose=true")), body)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, SchemaCompatibilityCheckResponse.class))
.onErrorResume(HttpClientResponseException.class,
ex -> ex.getStatus().equals(HttpStatus.NOT_FOUND) ? Mono.empty() : Mono.error(ex));
Expand All @@ -186,12 +209,17 @@ public Mono<SchemaCompatibilityCheckResponse> validateSchemaCompatibility(String
* @param body The schema compatibility request
* @return The schema compatibility update
*/
public Mono<SchemaCompatibilityResponse> updateSubjectCompatibility(String kafkaCluster, String subject,
public Mono<SchemaCompatibilityResponse> updateSubjectCompatibility(String kafkaCluster,
String subject,
SchemaCompatibilityRequest body) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request =
HttpRequest.PUT(URI.create(StringUtils.prependUri(config.getUrl(), CONFIG + subject)), body)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.PUT(
URI.create(StringUtils.prependUri(config.getUrl(),
CONFIG + encodedSubject)), body)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, SchemaCompatibilityResponse.class));
}

Expand All @@ -204,8 +232,13 @@ public Mono<SchemaCompatibilityResponse> updateSubjectCompatibility(String kafka
*/
public Mono<SchemaCompatibilityResponse> getCurrentCompatibilityBySubject(String kafkaCluster, String subject) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), CONFIG + subject)))
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(),
CONFIG + encodedSubject)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, SchemaCompatibilityResponse.class))
.onErrorResume(HttpClientResponseException.class,
ex -> ex.getStatus().equals(HttpStatus.NOT_FOUND) ? Mono.empty() : Mono.error(ex));
Expand All @@ -220,9 +253,13 @@ public Mono<SchemaCompatibilityResponse> getCurrentCompatibilityBySubject(String
*/
public Mono<SchemaCompatibilityResponse> deleteCurrentCompatibilityBySubject(String kafkaCluster, String subject) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
MutableHttpRequest<?> request =
HttpRequest.DELETE(URI.create(StringUtils.prependUri(config.getUrl(), CONFIG + subject)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);

MutableHttpRequest<?> request = HttpRequest.DELETE(
URI.create(StringUtils.prependUri(config.getUrl(),
CONFIG + encodedSubject)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, SchemaCompatibilityResponse.class));
}

Expand All @@ -235,11 +272,12 @@ public Mono<SchemaCompatibilityResponse> deleteCurrentCompatibilityBySubject(Str
*/
public Mono<List<TagTopicInfo>> associateTags(String kafkaCluster, List<TagTopicInfo> tagSpecs) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest
.POST(URI.create(StringUtils.prependUri(
config.getUrl(),

HttpRequest<?> request = HttpRequest.POST(
URI.create(StringUtils.prependUri(config.getUrl(),
"/catalog/v1/entity/tags")), tagSpecs)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class)));
}

Expand All @@ -252,9 +290,12 @@ public Mono<List<TagTopicInfo>> associateTags(String kafkaCluster, List<TagTopic
*/
public Mono<List<TagInfo>> createTags(String kafkaCluster, List<TagInfo> tags) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.POST(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/types/tagdefs")), tags)

HttpRequest<?> request = HttpRequest.POST(
URI.create(StringUtils.prependUri(config.getUrl(),
"/catalog/v1/types/tagdefs")), tags)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class)));
}

Expand All @@ -268,11 +309,12 @@ public Mono<List<TagInfo>> createTags(String kafkaCluster, List<TagInfo> tags) {
*/
public Mono<HttpResponse<Void>> dissociateTag(String kafkaCluster, String entityName, String tagName) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest
.DELETE(URI.create(StringUtils.prependUri(
config.getUrl(),

HttpRequest<?> request = HttpRequest.DELETE(
URI.create(StringUtils.prependUri(config.getUrl(),
"/catalog/v1/entity/type/kafka_topic/name/" + entityName + "/tags/" + tagName)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.exchange(request, Void.class));
}

Expand All @@ -284,11 +326,12 @@ public Mono<HttpResponse<Void>> dissociateTag(String kafkaCluster, String entity
*/
public Mono<TopicListResponse> getTopicWithCatalogInfo(String kafkaCluster, int limit, int offset) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest
.GET(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/search/basic?type=kafka_topic&limit="
+ limit + "&offset=" + offset)))

HttpRequest<?> request = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(),
"/catalog/v1/search/basic?type=kafka_topic&limit=" + limit + "&offset=" + offset)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, TopicListResponse.class));
}

Expand All @@ -302,10 +345,12 @@ public Mono<TopicListResponse> getTopicWithCatalogInfo(String kafkaCluster, int
public Mono<HttpResponse<TopicDescriptionUpdateResponse>> updateDescription(String kafkaCluster,
TopicDescriptionUpdateBody body) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest
.PUT(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/entity")), body)

HttpRequest<?> request = HttpRequest.PUT(
URI.create(StringUtils.prependUri(config.getUrl(),
"/catalog/v1/entity")), body)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.exchange(request, TopicDescriptionUpdateResponse.class));
}

Expand Down

0 comments on commit 8a3b4f1

Please sign in to comment.