From 8a3b4f127d52c4d5e5881008c85e50889d192655 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20GREFFIER?= Date: Tue, 15 Oct 2024 15:54:15 +0200 Subject: [PATCH] Fix SSRF on schema registry client (#468) * Attempt to fixing ssrf * Attempt fixing SSRF on schema registry client --- .../ns4kafka/service/SchemaService.java | 6 +- .../client/schema/SchemaRegistryClient.java | 127 ++++++++++++------ 2 files changed, 90 insertions(+), 43 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/service/SchemaService.java b/src/main/java/com/michelin/ns4kafka/service/SchemaService.java index f659c5cd..fe2a62f2 100644 --- a/src/main/java/com/michelin/ns4kafka/service/SchemaService.java +++ b/src/main/java/com/michelin/ns4kafka/service/SchemaService.java @@ -216,8 +216,10 @@ private Mono> validateReferences(Namespace ns, Schema schema) { */ public Mono register(Namespace namespace, Schema schema) { return schemaRegistryClient - .register(namespace.getMetadata().getCluster(), - schema.getMetadata().getName(), SchemaRequest.builder() + .register( + namespace.getMetadata().getCluster(), + schema.getMetadata().getName(), + SchemaRequest.builder() .schemaType(String.valueOf(schema.getSpec().getSchemaType())) .schema(schema.getSpec().getSchema()) .references(schema.getSpec().getReferences()) diff --git a/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java b/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java index 288d5ab9..6c2579bf 100644 --- a/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java +++ b/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java @@ -24,6 +24,8 @@ import jakarta.inject.Inject; import jakarta.inject.Singleton; import java.net.URI; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -71,9 +73,14 @@ public Flux getSubjects(String kafkaCluster) { */ public Mono getSubject(String kafkaCluster, String subject, String version) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); + String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8); + String encodedVersion = URLEncoder.encode(version, StandardCharsets.UTF_8); + HttpRequest request = HttpRequest.GET( - URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + VERSIONS + version))) + URI.create(StringUtils.prependUri(config.getUrl(), + SUBJECTS + encodedSubject + VERSIONS + encodedVersion))) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, SchemaResponse.class)) .onErrorResume(HttpClientResponseException.class, ex -> ex.getStatus().equals(HttpStatus.NOT_FOUND) ? Mono.empty() : Mono.error(ex)); @@ -88,15 +95,18 @@ public Mono getSubject(String kafkaCluster, String subject, Stri */ public Flux getAllSubjectVersions(String kafkaCluster, String subject) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); + String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8); + HttpRequest request = HttpRequest.GET( - URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + "/versions"))) + URI.create(StringUtils.prependUri(config.getUrl(), + SUBJECTS + encodedSubject + "/versions"))) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); return Flux.from(httpClient.retrieve(request, Integer[].class)) .flatMap(ids -> Flux.fromIterable(Arrays.asList(ids)) .flatMap(id -> { - HttpRequest requestVersion = HttpRequest.GET( - URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + VERSIONS + id))) + HttpRequest requestVersion = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), + SUBJECTS + encodedSubject + VERSIONS + id))) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); return httpClient.retrieve(requestVersion, SchemaResponse.class); @@ -115,10 +125,13 @@ public Flux getAllSubjectVersions(String kafkaCluster, String su */ public Mono register(String kafkaCluster, String subject, SchemaRequest body) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); - HttpRequest request = - HttpRequest.POST(URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + "/versions")), - body) - .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8); + + HttpRequest request = HttpRequest.POST( + URI.create(StringUtils.prependUri(config.getUrl(), + SUBJECTS + encodedSubject + "/versions")), body) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, SchemaResponse.class)); } @@ -132,9 +145,13 @@ public Mono register(String kafkaCluster, String subject, Schema */ public Mono deleteSubject(String kafkaCluster, String subject, boolean hardDelete) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); + String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8); + MutableHttpRequest request = HttpRequest.DELETE( - URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + "?permanent=" + hardDelete))) + URI.create(StringUtils.prependUri(config.getUrl(), + SUBJECTS + encodedSubject + "?permanent=" + hardDelete))) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, Integer[].class)); } @@ -147,13 +164,16 @@ public Mono deleteSubject(String kafkaCluster, String subject, boolea * @param hardDelete Should the subject be hard deleted or not * @return The version of the deleted subject */ - public Mono deleteSubjectVersion(String kafkaCluster, String subject, String version, - boolean hardDelete) { + public Mono deleteSubjectVersion(String kafkaCluster, String subject, String version, boolean hardDelete) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); + String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8); + String encodedVersion = URLEncoder.encode(version, StandardCharsets.UTF_8); + MutableHttpRequest request = HttpRequest.DELETE( - URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + VERSIONS + version - + "?permanent=" + hardDelete))) + URI.create(StringUtils.prependUri(config.getUrl(), + SUBJECTS + encodedSubject + VERSIONS + encodedVersion + "?permanent=" + hardDelete))) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, Integer.class)); } @@ -165,14 +185,17 @@ public Mono deleteSubjectVersion(String kafkaCluster, String subject, S * @param body The request * @return The schema compatibility validation */ - public Mono validateSchemaCompatibility(String kafkaCluster, String subject, + public Mono validateSchemaCompatibility(String kafkaCluster, + String subject, SchemaRequest body) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); - HttpRequest request = HttpRequest.POST(URI.create( - StringUtils.prependUri(config.getUrl(), "/compatibility/subjects/" + subject - + "/versions?verbose=true")), - body) + String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8); + + HttpRequest request = HttpRequest.POST( + URI.create(StringUtils.prependUri(config.getUrl(), + "/compatibility/subjects/" + encodedSubject + "/versions?verbose=true")), body) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, SchemaCompatibilityCheckResponse.class)) .onErrorResume(HttpClientResponseException.class, ex -> ex.getStatus().equals(HttpStatus.NOT_FOUND) ? Mono.empty() : Mono.error(ex)); @@ -186,12 +209,17 @@ public Mono validateSchemaCompatibility(String * @param body The schema compatibility request * @return The schema compatibility update */ - public Mono updateSubjectCompatibility(String kafkaCluster, String subject, + public Mono updateSubjectCompatibility(String kafkaCluster, + String subject, SchemaCompatibilityRequest body) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); - HttpRequest request = - HttpRequest.PUT(URI.create(StringUtils.prependUri(config.getUrl(), CONFIG + subject)), body) - .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8); + + HttpRequest request = HttpRequest.PUT( + URI.create(StringUtils.prependUri(config.getUrl(), + CONFIG + encodedSubject)), body) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, SchemaCompatibilityResponse.class)); } @@ -204,8 +232,13 @@ public Mono updateSubjectCompatibility(String kafka */ public Mono getCurrentCompatibilityBySubject(String kafkaCluster, String subject) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); - HttpRequest request = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), CONFIG + subject))) + String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8); + + HttpRequest request = HttpRequest.GET( + URI.create(StringUtils.prependUri(config.getUrl(), + CONFIG + encodedSubject))) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, SchemaCompatibilityResponse.class)) .onErrorResume(HttpClientResponseException.class, ex -> ex.getStatus().equals(HttpStatus.NOT_FOUND) ? Mono.empty() : Mono.error(ex)); @@ -220,9 +253,13 @@ public Mono getCurrentCompatibilityBySubject(String */ public Mono deleteCurrentCompatibilityBySubject(String kafkaCluster, String subject) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); - MutableHttpRequest request = - HttpRequest.DELETE(URI.create(StringUtils.prependUri(config.getUrl(), CONFIG + subject))) - .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8); + + MutableHttpRequest request = HttpRequest.DELETE( + URI.create(StringUtils.prependUri(config.getUrl(), + CONFIG + encodedSubject))) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, SchemaCompatibilityResponse.class)); } @@ -235,11 +272,12 @@ public Mono deleteCurrentCompatibilityBySubject(Str */ public Mono> associateTags(String kafkaCluster, List tagSpecs) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); - HttpRequest request = HttpRequest - .POST(URI.create(StringUtils.prependUri( - config.getUrl(), + + HttpRequest request = HttpRequest.POST( + URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/entity/tags")), tagSpecs) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class))); } @@ -252,9 +290,12 @@ public Mono> associateTags(String kafkaCluster, List> createTags(String kafkaCluster, List tags) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); - HttpRequest request = HttpRequest.POST(URI.create(StringUtils.prependUri( - config.getUrl(), "/catalog/v1/types/tagdefs")), tags) + + HttpRequest request = HttpRequest.POST( + URI.create(StringUtils.prependUri(config.getUrl(), + "/catalog/v1/types/tagdefs")), tags) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class))); } @@ -268,11 +309,12 @@ public Mono> createTags(String kafkaCluster, List tags) { */ public Mono> dissociateTag(String kafkaCluster, String entityName, String tagName) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); - HttpRequest request = HttpRequest - .DELETE(URI.create(StringUtils.prependUri( - config.getUrl(), + + HttpRequest request = HttpRequest.DELETE( + URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/entity/type/kafka_topic/name/" + entityName + "/tags/" + tagName))) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.exchange(request, Void.class)); } @@ -284,11 +326,12 @@ public Mono> dissociateTag(String kafkaCluster, String entity */ public Mono getTopicWithCatalogInfo(String kafkaCluster, int limit, int offset) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); - HttpRequest request = HttpRequest - .GET(URI.create(StringUtils.prependUri( - config.getUrl(), "/catalog/v1/search/basic?type=kafka_topic&limit=" - + limit + "&offset=" + offset))) + + HttpRequest request = HttpRequest.GET( + URI.create(StringUtils.prependUri(config.getUrl(), + "/catalog/v1/search/basic?type=kafka_topic&limit=" + limit + "&offset=" + offset))) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, TopicListResponse.class)); } @@ -302,10 +345,12 @@ public Mono getTopicWithCatalogInfo(String kafkaCluster, int public Mono> updateDescription(String kafkaCluster, TopicDescriptionUpdateBody body) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); - HttpRequest request = HttpRequest - .PUT(URI.create(StringUtils.prependUri( - config.getUrl(), "/catalog/v1/entity")), body) + + HttpRequest request = HttpRequest.PUT( + URI.create(StringUtils.prependUri(config.getUrl(), + "/catalog/v1/entity")), body) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.exchange(request, TopicDescriptionUpdateResponse.class)); }