From ca76ea46f4f225ef9c8d96fa7d365b17757a820b Mon Sep 17 00:00:00 2001 From: thcai Date: Wed, 11 Dec 2024 17:22:04 +0100 Subject: [PATCH] Improve connect-cluster vault error handling & messages --- .../connect/ConnectClusterController.java | 11 +- .../service/ConnectClusterService.java | 52 +-- .../ns4kafka/util/FormatErrorUtils.java | 21 +- .../ConnectClusterControllerTest.java | 100 +++--- .../service/ConnectClusterServiceTest.java | 300 +++--------------- 5 files changed, 106 insertions(+), 378 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectClusterController.java b/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectClusterController.java index 413ad94a..fb46e6f8 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectClusterController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectClusterController.java @@ -1,7 +1,6 @@ package com.michelin.ns4kafka.controller.connect; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterDeleteOperation; -import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterNotAllowed; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidOwner; import static com.michelin.ns4kafka.util.enumation.Kind.CONNECT_CLUSTER; import static io.micronaut.core.util.StringUtils.EMPTY_STRING; @@ -246,7 +245,8 @@ public HttpResponse> bulkDelete(String namespace, public List listVaults(final String namespace) { return connectClusterService.findAllForNamespaceWithWritePermission(getNamespace(namespace)) .stream() - .filter(connectCluster -> StringUtils.hasText(connectCluster.getSpec().getAes256Key())) + .filter(connectCluster -> StringUtils.hasText(connectCluster.getSpec().getAes256Key()) + && StringUtils.hasText(connectCluster.getSpec().getAes256Salt())) .toList(); } @@ -264,12 +264,7 @@ public List vaultPassword(final String namespace, @Body final List passwords) { final Namespace ns = getNamespace(namespace); - final var validationErrors = new ArrayList(); - if (!connectClusterService.isNamespaceAllowedForConnectCluster(ns, connectCluster)) { - validationErrors.add(invalidConnectClusterNotAllowed(connectCluster)); - } - - validationErrors.addAll(connectClusterService.validateConnectClusterVault(ns, connectCluster)); + List validationErrors = connectClusterService.validateConnectClusterVault(ns, connectCluster); if (!validationErrors.isEmpty()) { throw new ResourceValidationException(CONNECT_CLUSTER, connectCluster, validationErrors); diff --git a/src/main/java/com/michelin/ns4kafka/service/ConnectClusterService.java b/src/main/java/com/michelin/ns4kafka/service/ConnectClusterService.java index fb622c61..ed1ea64f 100644 --- a/src/main/java/com/michelin/ns4kafka/service/ConnectClusterService.java +++ b/src/main/java/com/michelin/ns4kafka/service/ConnectClusterService.java @@ -3,6 +3,7 @@ import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterEncryptionConfig; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterMalformedUrl; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterNameAlreadyExistGlobally; +import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterNoEncryptionConfig; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterNotHealthy; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidNotFound; @@ -17,7 +18,6 @@ import com.michelin.ns4kafka.service.client.connect.KafkaConnectClient; import com.michelin.ns4kafka.service.client.connect.entities.ServerInfo; import com.michelin.ns4kafka.util.EncryptionUtils; -import com.michelin.ns4kafka.util.FormatErrorUtils; import com.michelin.ns4kafka.util.RegexUtils; import io.micronaut.core.util.StringUtils; import io.micronaut.http.HttpRequest; @@ -32,7 +32,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @@ -190,7 +189,8 @@ public Optional findByNameWithOwnerPermission(Namespace namespac public List findAllForNamespaceWithWritePermission(Namespace namespace) { return Stream.concat( findByWildcardNameWithOwnerPermission(namespace, "*").stream(), - findAllForNamespaceByPermissions(namespace, List.of(AccessControlEntry.Permission.WRITE)).stream() + findAllForNamespaceByPermissions(namespace, List.of(AccessControlEntry.Permission.WRITE)) + .stream() .map(connectCluster -> ConnectCluster.builder() .metadata(connectCluster.getMetadata()) .spec(ConnectCluster.ConnectClusterSpec.builder() @@ -281,41 +281,26 @@ public Mono> validateConnectClusterCreation(ConnectCluster connectC } /** - * Validate the given connect worker has configuration for vaults. + * Validate the given connect worker has configuration for vault. * * @param connectCluster The Kafka connect worker to validate * @return A list of validation errors */ public List validateConnectClusterVault(final Namespace namespace, final String connectCluster) { final var errors = new ArrayList(); + Optional kafkaConnect = findAllForNamespaceWithWritePermission(namespace) + .stream() + .filter(cc -> cc.getMetadata().getName().equals(connectCluster)) + .findFirst(); - final List kafkaConnects = findAllForNamespaceByPermissions(namespace, - List.of(AccessControlEntry.Permission.OWNER, AccessControlEntry.Permission.WRITE)); - - if (kafkaConnects.isEmpty()) { + if (kafkaConnect.isEmpty()) { errors.add(invalidNotFound(connectCluster)); return errors; } - if (kafkaConnects.stream().noneMatch(cc -> StringUtils.hasText(cc.getSpec().getAes256Key()) - && StringUtils.hasText(cc.getSpec().getAes256Salt()))) { - errors.add(invalidConnectClusterEncryptionConfig()); - return errors; - } - - final Optional kafkaConnect = kafkaConnects.stream() - .filter(cc -> cc.getMetadata().getName().equals(connectCluster) - && StringUtils.hasText(cc.getSpec().getAes256Key()) - && StringUtils.hasText(cc.getSpec().getAes256Salt())) - .findFirst(); - - if (kafkaConnect.isEmpty()) { - final String allowedConnectClusters = kafkaConnects.stream() - .filter(cc -> StringUtils.hasText(cc.getSpec().getAes256Key()) - && StringUtils.hasText(cc.getSpec().getAes256Salt())) - .map(cc -> cc.getMetadata().getName()) - .collect(Collectors.joining(", ")); - errors.add(FormatErrorUtils.invalidConnectClusterMustBeOneOf(connectCluster, allowedConnectClusters)); + if (!StringUtils.hasText(kafkaConnect.get().getSpec().getAes256Key()) + || !StringUtils.hasText(kafkaConnect.get().getSpec().getAes256Salt())) { + errors.add(invalidConnectClusterNoEncryptionConfig()); return errors; } @@ -343,19 +328,6 @@ public boolean isNamespaceOwnerOfConnectCluster(Namespace namespace, String conn AccessControlEntry.ResourceType.CONNECT_CLUSTER, connectCluster); } - /** - * Is given namespace allowed (Owner or Writer) for the given connect worker. - * - * @param namespace The namespace - * @param connectCluster The Kafka connect cluster - * @return true if it is, false otherwise - */ - public boolean isNamespaceAllowedForConnectCluster(Namespace namespace, String connectCluster) { - return findAllForNamespaceWithWritePermission(namespace) - .stream() - .anyMatch(kafkaConnect -> kafkaConnect.getMetadata().getName().equals(connectCluster)); - } - /** * Vault a password for a specific namespace and a kafka connect cluster. * diff --git a/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java b/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java index 9d22c087..70e68981 100644 --- a/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java +++ b/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java @@ -181,26 +181,13 @@ public static String invalidConnectClusterNameAlreadyExistGlobally(String invali } /** - * Invalid connect cluster namespace not allowed. + * Invalid connect cluster missing encryption config. * - * @param invalidNameValue the invalid field value - * @return the error message - */ - public static String invalidConnectClusterNotAllowed(String invalidNameValue) { - return String.format(INVALID_FIELD, invalidNameValue, "connect-cluster", - "namespace is not allowed to use this Kafka Connect"); - } - - /** - * Invalid connect cluster must be one of. - * - * @param invalidNameValue the invalid field value - * @param allowedConnectClusters the allowed connect clusters * @return the error message */ - public static String invalidConnectClusterMustBeOneOf(String invalidNameValue, - String allowedConnectClusters) { - return invalidValueMustBeOneOf(FIELD_NAME, invalidNameValue, allowedConnectClusters); + public static String invalidConnectClusterNoEncryptionConfig() { + return String.format(INVALID_EMPTY_FIELDS, String.join(", ", "aes256Key", "aes256Salt"), + "AES key and salt are required to use connect-cluster vault"); } /** diff --git a/src/test/java/com/michelin/ns4kafka/controller/ConnectClusterControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/ConnectClusterControllerTest.java index 88d5e04f..40adf317 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/ConnectClusterControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/ConnectClusterControllerTest.java @@ -632,7 +632,7 @@ void shouldUpdateConnectClusterWhenChanged() { } @Test - void shouldCreateConnectClusterInDryRunMode() { + void shouldNotCreateConnectClusterInDryRunMode() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() .name("test") @@ -666,7 +666,28 @@ void shouldCreateConnectClusterInDryRunMode() { } @Test - void shouldListVaultConnectClusterReturnEmptyWhenNoAes256Config() { + void shouldListNoVaultConnectClusterWhenNoConnectCluster() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .spec(Namespace.NamespaceSpec.builder() + .topicValidator(TopicValidator.makeDefault()) + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + when(connectClusterService.findAllForNamespaceWithWritePermission(ns)) + .thenReturn(List.of()); + + List actual = connectClusterController.listVaults("test"); + assertTrue(actual.isEmpty()); + } + + @Test + void shouldListNoVaultConnectClusterWhenNoAes256Config() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() .name("test") @@ -706,7 +727,7 @@ void shouldListVaultConnectClusterWhenAes256Config() { .build()) .build(); - ConnectCluster connectCluster = ConnectCluster.builder() + ConnectCluster ccNoAes = ConnectCluster.builder() .metadata(Metadata.builder() .name("connect-cluster") .build()) @@ -714,51 +735,40 @@ void shouldListVaultConnectClusterWhenAes256Config() { .build()) .build(); - ConnectCluster connectClusterAes256 = ConnectCluster.builder() + ConnectCluster ccAes256Key = ConnectCluster.builder() .metadata(Metadata.builder() - .name("connect-cluster-aes256") + .name("connect-cluster") .build()) .spec(ConnectCluster.ConnectClusterSpec.builder() .aes256Key("myKeyEncryption") - .aes256Salt("p8t42EhY9z2eSUdpGeq7HX7RboMrsJAhUnu3EEJJVS") .build()) .build(); - when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); - when(connectClusterService.findAllForNamespaceWithWritePermission(ns)).thenReturn( - List.of(connectCluster, connectClusterAes256)); - - List actual = connectClusterController.listVaults("test"); - assertEquals(1, actual.size()); - } + ConnectCluster ccAes256Salt = ConnectCluster.builder() + .metadata(Metadata.builder() + .name("connect-cluster") + .build()) + .spec(ConnectCluster.ConnectClusterSpec.builder() + .aes256Salt("p8t42EhY9z2eSUdpGeq7HX7RboMrsJAhUnu3EEJJVS") + .build()) + .build(); - @Test - void shouldNotVaultOnNotAllowedConnectCluster() { - String connectClusterName = "connect-cluster-na"; - Namespace ns = Namespace.builder() + ConnectCluster ccAes256KeySalt = ConnectCluster.builder() .metadata(Metadata.builder() - .name("test") - .cluster("local") + .name("connect-cluster-aes256") .build()) - .spec(Namespace.NamespaceSpec.builder() - .topicValidator(TopicValidator.makeDefault()) + .spec(ConnectCluster.ConnectClusterSpec.builder() + .aes256Key("myKeyEncryption") + .aes256Salt("p8t42EhY9z2eSUdpGeq7HX7RboMrsJAhUnu3EEJJVS") .build()) .build(); - when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); - when(connectClusterService.isNamespaceAllowedForConnectCluster(ns, connectClusterName)) - .thenReturn(false); - when(connectClusterService.validateConnectClusterVault(ns, connectClusterName)) - .thenReturn(List.of()); + when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); + when(connectClusterService.findAllForNamespaceWithWritePermission(ns)).thenReturn( + List.of(ccNoAes, ccAes256Key, ccAes256Salt, ccAes256KeySalt)); - var secrets = List.of("secret"); - ResourceValidationException result = assertThrows(ResourceValidationException.class, - () -> connectClusterController.vaultPassword("test", connectClusterName, secrets)); - assertEquals(1, result.getValidationErrors().size()); - assertEquals("Invalid value \"connect-cluster-na\" for field \"connect-cluster\": " - + "namespace is not allowed to use this Kafka Connect.", - result.getValidationErrors().getFirst()); + List actual = connectClusterController.listVaults("test"); + assertEquals(List.of(ccAes256KeySalt), actual); } @Test @@ -776,8 +786,6 @@ void shouldNotVaultOnConnectClusterWithInvalidAes256Config() { when(namespaceService.findByName("test")) .thenReturn(Optional.of(ns)); - when(connectClusterService.isNamespaceAllowedForConnectCluster(ns, connectClusterName)) - .thenReturn(true); when(connectClusterService.validateConnectClusterVault(ns, connectClusterName)) .thenReturn(List.of("Error config.")); @@ -786,6 +794,7 @@ void shouldNotVaultOnConnectClusterWithInvalidAes256Config() { () -> connectClusterController.vaultPassword("test", connectClusterName, secrets)); assertEquals(1, result.getValidationErrors().size()); assertEquals("Error config.", result.getValidationErrors().getFirst()); + verify(connectClusterService, never()).vaultPassword(any(), any(), any()); } @Test @@ -803,23 +812,10 @@ void shouldVaultOnConnectClusterWithValidAes256Config() { when(namespaceService.findByName("test")) .thenReturn(Optional.of(ns)); - when(connectClusterService.isNamespaceAllowedForConnectCluster(ns, connectClusterName)) - .thenReturn(true); when(connectClusterService.validateConnectClusterVault(ns, connectClusterName)) .thenReturn(List.of()); - when(connectClusterService.vaultPassword(ns, connectClusterName, List.of("secret"))) - .thenReturn(List.of(VaultResponse.builder() - .spec(VaultResponse.VaultResponseSpec.builder() - .clearText("secret") - .encrypted("encryptedSecret") - .build()) - .build() - )); - - final List actual = connectClusterController - .vaultPassword("test", connectClusterName, List.of("secret")); - - assertEquals("secret", actual.getFirst().getSpec().getClearText()); - assertEquals("encryptedSecret", actual.getFirst().getSpec().getEncrypted()); + + connectClusterController.vaultPassword("test", connectClusterName, List.of("secret")); + verify(connectClusterService).vaultPassword(ns, connectClusterName, List.of("secret")); } } diff --git a/src/test/java/com/michelin/ns4kafka/service/ConnectClusterServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/ConnectClusterServiceTest.java index 127e1c13..fda6f7ce 100644 --- a/src/test/java/com/michelin/ns4kafka/service/ConnectClusterServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/ConnectClusterServiceTest.java @@ -911,7 +911,7 @@ void shouldValidateConnectClusterCreationWhenDownAndMissingKey() { } @Test - void shouldValidateConnectClusterVaultWhenNoClusterAvailable() { + void shouldNotValidateConnectClusterVaultWhenNoConnectClusterAvailable() { Namespace namespace = Namespace.builder() .metadata(Metadata.builder() .name("myNamespace") @@ -927,6 +927,7 @@ void shouldValidateConnectClusterVaultWhenNoClusterAvailable() { .build()) .spec(ConnectCluster.ConnectClusterSpec.builder() .aes256Key("aes256Key") + .aes256Salt("aes256Salt") .build()) .build(); @@ -955,7 +956,8 @@ void shouldValidateConnectClusterVaultWhenNoClusterAvailable() { } @Test - void shouldValidateConnectClusterVaultWhenNoClusterAvailableWithAes256() { + void shouldNotValidateConnectClusterVaultWhenNoConnectClusterAvailableWithAes256() { + String encryptKey = "changeitchangeitchangeitchangeit"; Namespace namespace = Namespace.builder() .metadata(Metadata.builder() .name("myNamespace") @@ -965,34 +967,37 @@ void shouldValidateConnectClusterVaultWhenNoClusterAvailableWithAes256() { .build()) .build(); - ConnectCluster connectCluster1 = ConnectCluster.builder() + ConnectCluster ccNoAesKeyAndSalt = ConnectCluster.builder() .metadata(Metadata.builder() - .name("prefix1.connect-cluster") + .name("prefix.noAesKeyAndSalt") .build()) .spec(ConnectCluster.ConnectClusterSpec.builder() + .password(EncryptionUtils.encryptAes256Gcm("password", encryptKey)) .build()) .build(); - ConnectCluster connectCluster2 = ConnectCluster.builder() + ConnectCluster ccNoAesSalt = ConnectCluster.builder() .metadata(Metadata.builder() - .name("prefix2.connect-cluster") + .name("prefix.noAesSalt") .build()) .spec(ConnectCluster.ConnectClusterSpec.builder() - .aes256Key("aes256Key") + .password(EncryptionUtils.encryptAes256Gcm("password", encryptKey)) + .aes256Salt(EncryptionUtils.encryptAes256Gcm("aes256Salt", encryptKey)) .build()) .build(); - ConnectCluster connectCluster3 = ConnectCluster.builder() + ConnectCluster ccNoAesKey = ConnectCluster.builder() .metadata(Metadata.builder() - .name("prefix3.connect-cluster") + .name("prefix.noAesKey") .build()) .spec(ConnectCluster.ConnectClusterSpec.builder() - .aes256Salt("aes256Salt") + .password(EncryptionUtils.encryptAes256Gcm("password", encryptKey)) + .aes256Key(EncryptionUtils.encryptAes256Gcm("aes256Key", encryptKey)) .build()) .build(); when(connectClusterRepository.findAllForCluster("local")) - .thenReturn(List.of(connectCluster1, connectCluster2, connectCluster3)); + .thenReturn(List.of(ccNoAesKeyAndSalt, ccNoAesSalt, ccNoAesKey)); when(aclService.findAllGrantedToNamespace(namespace)) .thenReturn(List.of( @@ -1002,134 +1007,32 @@ void shouldValidateConnectClusterVaultWhenNoClusterAvailableWithAes256() { .grantedTo("namespace") .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) - .resource("prefix1.") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.WRITE) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) - .resource("prefix2.") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.WRITE) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) - .resource("prefix3.") + .resource("prefix.") .build()) - .build() - )); + .build())); - when(aclService.isResourceCoveredByAcls(any(), eq("prefix1.connect-cluster"))) + when(aclService.isResourceCoveredByAcls(any(), any())) .thenReturn(true); - when(aclService.isResourceCoveredByAcls(any(), eq("prefix2.connect-cluster"))) - .thenReturn(true); - when(aclService.isResourceCoveredByAcls(any(), eq("prefix3.connect-cluster"))) - .thenReturn(true); - - List errors = - connectClusterService.validateConnectClusterVault(namespace, "prefix1.fake-connect-cluster"); - - assertEquals(1L, errors.size()); - assertEquals("Invalid empty value for fields \"aes256Key, aes256Salt\": " - + "AES key and salt are required to activate encryption.", errors.getFirst()); - } - - @Test - void shouldValidateConnectClusterVaultWhenClusterNotAvailable() { - Namespace namespace = Namespace.builder() - .metadata(Metadata.builder() - .name("myNamespace") - .cluster("local") - .build()) - .spec(Namespace.NamespaceSpec.builder() - .build()) - .build(); - - ConnectCluster connectCluster1 = ConnectCluster.builder() - .metadata(Metadata.builder() - .name("prefix1.connect-cluster") - .build()) - .spec(ConnectCluster.ConnectClusterSpec.builder() - .build()) - .build(); - - ConnectCluster connectCluster2 = ConnectCluster.builder() - .metadata(Metadata.builder() - .name("prefix2.connect-cluster") - .build()) - .spec(ConnectCluster.ConnectClusterSpec.builder() - .aes256Key("aes256Key") - .aes256Salt("aes256Salt") - .build()) - .build(); - ConnectCluster connectCluster3 = ConnectCluster.builder() - .metadata(Metadata.builder() - .name("prefix3.connect-cluster") - .build()) - .spec(ConnectCluster.ConnectClusterSpec.builder() - .aes256Key("aes256Key") - .aes256Salt("aes256Salt") - .build()) - .build(); - - when(connectClusterRepository.findAllForCluster("local")) - .thenReturn(List.of(connectCluster1, connectCluster2, connectCluster3)); - - when(aclService.findAllGrantedToNamespace(namespace)) - .thenReturn(List.of( - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.WRITE) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) - .resource("prefix1.") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.WRITE) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) - .resource("prefix2.") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.WRITE) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) - .resource("prefix3.") - .build()) - .build() - )); + when(kafkaConnectClient.version(any(), any())).thenReturn(Mono.just(HttpResponse.ok())); + when(securityProperties.getAes256EncryptionKey()).thenReturn(encryptKey); - when(aclService.isResourceCoveredByAcls(any(), eq("prefix1.connect-cluster"))) - .thenReturn(true); - when(aclService.isResourceCoveredByAcls(any(), eq("prefix2.connect-cluster"))) - .thenReturn(true); - when(aclService.isResourceCoveredByAcls(any(), eq("prefix3.connect-cluster"))) - .thenReturn(true); + assertEquals(List.of("Invalid empty value for fields \"aes256Key, aes256Salt\": " + + "AES key and salt are required to use connect-cluster vault."), + connectClusterService.validateConnectClusterVault(namespace, "prefix.noAesKeyAndSalt")); - List errors = - connectClusterService.validateConnectClusterVault(namespace, "prefix1.fake-connect-cluster"); + assertEquals(List.of("Invalid empty value for fields \"aes256Key, aes256Salt\": " + + "AES key and salt are required to use connect-cluster vault."), + connectClusterService.validateConnectClusterVault(namespace, "prefix.noAesSalt")); - assertEquals(1L, errors.size()); - assertEquals("Invalid value \"prefix1.fake-connect-cluster\" for field \"name\": " - + "value must be one of \"prefix2.connect-cluster, prefix3.connect-cluster\".", errors.getFirst()); + assertEquals(List.of("Invalid empty value for fields \"aes256Key, aes256Salt\": " + + "AES key and salt are required to use connect-cluster vault."), + connectClusterService.validateConnectClusterVault(namespace, "prefix.noAesKey")); } @Test void shouldValidateConnectClusterVault() { + String encryptKey = "changeitchangeitchangeitchangeit"; Namespace namespace = Namespace.builder() .metadata(Metadata.builder() .name("myNamespace") @@ -1144,8 +1047,10 @@ void shouldValidateConnectClusterVault() { .name("prefix.connect-cluster") .build()) .spec(ConnectCluster.ConnectClusterSpec.builder() - .aes256Key("aes256Key") - .aes256Salt("aes256Salt") + .username("username") + .password(EncryptionUtils.encryptAes256Gcm("password", encryptKey)) + .aes256Key(EncryptionUtils.encryptAes256Gcm("aes256Key", encryptKey)) + .aes256Salt(EncryptionUtils.encryptAes256Gcm("aes256Salt", encryptKey)) .build()) .build(); @@ -1162,14 +1067,15 @@ void shouldValidateConnectClusterVault() { .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) .resource("prefix.") .build()) - .build() - )); + .build())); when(aclService.isResourceCoveredByAcls(any(), eq("prefix.connect-cluster"))).thenReturn(true); + when(kafkaConnectClient.version(any(), any())).thenReturn(Mono.just(HttpResponse.ok())); + when(securityProperties.getAes256EncryptionKey()).thenReturn(encryptKey); List errors = connectClusterService.validateConnectClusterVault(namespace, "prefix.connect-cluster"); - assertEquals(0L, errors.size()); + assertTrue(errors.isEmpty()); } @Test @@ -1336,134 +1242,6 @@ void shouldValidateNamespaceOwnerOfConnectCluster() { assertTrue(actual); } - @Test - void shouldNamespaceBeAllowedToWriteToConnectCluster() { - Namespace namespace = Namespace.builder() - .metadata(Metadata.builder() - .name("myNamespace") - .cluster("local") - .build()) - .spec(Namespace.NamespaceSpec.builder() - .build()) - .build(); - - ConnectCluster connectCluster = ConnectCluster.builder() - .metadata(Metadata.builder() - .name("prefix.connect-cluster") - .cluster("local") - .build()) - .spec(ConnectCluster.ConnectClusterSpec.builder() - .build()) - .build(); - - ConnectCluster connectClusterOwner = ConnectCluster.builder() - .metadata(Metadata.builder() - .name("owner.connect-cluster") - .cluster("local") - .build()) - .spec(ConnectCluster.ConnectClusterSpec.builder() - .build()) - .build(); - - when(kafkaConnectClient.version(any(), any())) - .thenReturn(Mono.just(HttpResponse.ok())); - - when(connectClusterRepository.findAllForCluster("local")) - .thenReturn(List.of(connectCluster, connectClusterOwner)); - - when(aclService.findAllGrantedToNamespace(namespace)) - .thenReturn(List.of( - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.WRITE) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) - .resource("prefix.") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) - .resource("owner.") - .build()) - .build() - )); - - when(aclService.isResourceCoveredByAcls(any(), eq("prefix.connect-cluster"))) - .thenReturn(true); - - assertTrue(connectClusterService.isNamespaceAllowedForConnectCluster(namespace, "prefix.connect-cluster")); - } - - @Test - void shouldNamespaceNotBeAllowedToWriteToConnectCluster() { - Namespace namespace = Namespace.builder() - .metadata(Metadata.builder() - .name("myNamespace") - .cluster("local") - .build()) - .spec(Namespace.NamespaceSpec.builder() - .build()) - .build(); - - ConnectCluster connectCluster = ConnectCluster.builder() - .metadata(Metadata.builder() - .name("prefix.connect-cluster") - .cluster("local") - .build()) - .spec(ConnectCluster.ConnectClusterSpec.builder() - .build()) - .build(); - - ConnectCluster connectClusterOwner = ConnectCluster.builder() - .metadata(Metadata.builder() - .name("owner.connect-cluster") - .cluster("local") - .build()) - .spec(ConnectCluster.ConnectClusterSpec.builder() - .build()) - .build(); - - when(kafkaConnectClient.version(any(), any())) - .thenReturn(Mono.just(HttpResponse.ok())); - - when(connectClusterRepository.findAllForCluster("local")) - .thenReturn(List.of(connectCluster, connectClusterOwner)); - - when(aclService.findAllGrantedToNamespace(namespace)) - .thenReturn(List.of( - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.WRITE) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) - .resource("prefix.") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) - .resource("owner.") - .build()) - .build() - )); - - when(aclService.isResourceCoveredByAcls(any(), eq("prefix.connect-cluster"))) - .thenReturn(true); - when(aclService.isResourceCoveredByAcls(any(), eq("owner.connect-cluster"))) - .thenReturn(true); - - assertFalse(connectClusterService.isNamespaceAllowedForConnectCluster(namespace, "not-allowed-prefix.cc")); - } - @Test void shouldVaultPasswordWithoutFormat() { String encryptionKey = "changeitchangeitchangeitchangeit";