diff --git a/src/main/java/com/michelin/ns4kafka/service/ConnectClusterService.java b/src/main/java/com/michelin/ns4kafka/service/ConnectClusterService.java index 834e5786..e0e27810 100644 --- a/src/main/java/com/michelin/ns4kafka/service/ConnectClusterService.java +++ b/src/main/java/com/michelin/ns4kafka/service/ConnectClusterService.java @@ -264,9 +264,11 @@ public ConnectCluster create(ConnectCluster connectCluster) { public Mono> validateConnectClusterCreation(ConnectCluster connectCluster) { List errors = new ArrayList<>(); - if (managedClusterProperties.stream().anyMatch(cluster -> - cluster.getConnects().entrySet().stream() - .anyMatch(entry -> entry.getKey().equals(connectCluster.getMetadata().getName())))) { + if (managedClusterProperties.stream() + .filter(cluster -> cluster.getConnects() != null) + .anyMatch(cluster -> cluster.getConnects().entrySet() + .stream() + .anyMatch(entry -> entry.getKey().equals(connectCluster.getMetadata().getName())))) { errors.add(invalidConnectClusterNameAlreadyExistGlobally(connectCluster.getMetadata().getName())); } diff --git a/src/test/java/com/michelin/ns4kafka/service/ConnectClusterServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/ConnectClusterServiceTest.java index e21c0180..d3f00b57 100644 --- a/src/test/java/com/michelin/ns4kafka/service/ConnectClusterServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/ConnectClusterServiceTest.java @@ -541,6 +541,47 @@ void createCredentialsEncrypted() { assertNotEquals("myAES256Salt", connectCluster.getSpec().getAes256Salt()); } + @Test + void validateConnectClusterCreationWhenNoNs4ConnectConfig() { + ManagedClusterProperties kafka = new ManagedClusterProperties("local"); + when(managedClusterPropertiesList.stream()).thenReturn(Stream.of(kafka)); + when(httpClient.retrieve(any(MutableHttpRequest.class), eq(ServerInfo.class))) + .thenReturn(Mono.just(ServerInfo.builder().build())); + + ConnectCluster connectCluster = ConnectCluster.builder() + .metadata(Metadata.builder().name("test-connect") + .build()) + .spec(ConnectCluster.ConnectClusterSpec.builder() + .url("https://after") + .build()) + .build(); + + StepVerifier.create(connectClusterService.validateConnectClusterCreation(connectCluster)) + .consumeNextWith(errors -> assertTrue(errors.isEmpty())) + .verifyComplete(); + } + + @Test + void validateConnectClusterCreationNotAlreadyDefined() { + ManagedClusterProperties kafka = new ManagedClusterProperties("local"); + kafka.setConnects(Map.of("test-connect", new ManagedClusterProperties.ConnectProperties())); + when(managedClusterPropertiesList.stream()).thenReturn(Stream.of(kafka)); + when(httpClient.retrieve(any(MutableHttpRequest.class), eq(ServerInfo.class))) + .thenReturn(Mono.just(ServerInfo.builder().build())); + + ConnectCluster connectCluster = ConnectCluster.builder() + .metadata(Metadata.builder().name("test-connect2") + .build()) + .spec(ConnectCluster.ConnectClusterSpec.builder() + .url("https://after") + .build()) + .build(); + + StepVerifier.create(connectClusterService.validateConnectClusterCreation(connectCluster)) + .consumeNextWith(errors -> assertTrue(errors.isEmpty())) + .verifyComplete(); + } + @Test void validateConnectClusterCreationAlreadyDefined() { ManagedClusterProperties kafka = new ManagedClusterProperties("local");