Skip to content

Commit

Permalink
Cc creation fix included
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasCAI-mlv committed Jul 30, 2024
1 parent c21c8ce commit b571b19
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,11 @@ public ConnectCluster create(ConnectCluster connectCluster) {
public Mono<List<String>> validateConnectClusterCreation(ConnectCluster connectCluster) {
List<String> errors = new ArrayList<>();

if (managedClusterProperties.stream().anyMatch(cluster ->
cluster.getConnects().entrySet().stream()
.anyMatch(entry -> entry.getKey().equals(connectCluster.getMetadata().getName())))) {
if (managedClusterProperties.stream()
.filter(cluster -> cluster.getConnects() != null)
.anyMatch(cluster -> cluster.getConnects().entrySet()
.stream()
.anyMatch(entry -> entry.getKey().equals(connectCluster.getMetadata().getName())))) {
errors.add(invalidConnectClusterNameAlreadyExistGlobally(connectCluster.getMetadata().getName()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,47 @@ void createCredentialsEncrypted() {
assertNotEquals("myAES256Salt", connectCluster.getSpec().getAes256Salt());
}

@Test
void validateConnectClusterCreationWhenNoNs4ConnectConfig() {
ManagedClusterProperties kafka = new ManagedClusterProperties("local");
when(managedClusterPropertiesList.stream()).thenReturn(Stream.of(kafka));
when(httpClient.retrieve(any(MutableHttpRequest.class), eq(ServerInfo.class)))
.thenReturn(Mono.just(ServerInfo.builder().build()));

ConnectCluster connectCluster = ConnectCluster.builder()
.metadata(Metadata.builder().name("test-connect")
.build())
.spec(ConnectCluster.ConnectClusterSpec.builder()
.url("https://after")
.build())
.build();

StepVerifier.create(connectClusterService.validateConnectClusterCreation(connectCluster))
.consumeNextWith(errors -> assertTrue(errors.isEmpty()))
.verifyComplete();
}

@Test
void validateConnectClusterCreationNotAlreadyDefined() {
ManagedClusterProperties kafka = new ManagedClusterProperties("local");
kafka.setConnects(Map.of("test-connect", new ManagedClusterProperties.ConnectProperties()));
when(managedClusterPropertiesList.stream()).thenReturn(Stream.of(kafka));
when(httpClient.retrieve(any(MutableHttpRequest.class), eq(ServerInfo.class)))
.thenReturn(Mono.just(ServerInfo.builder().build()));

ConnectCluster connectCluster = ConnectCluster.builder()
.metadata(Metadata.builder().name("test-connect2")
.build())
.spec(ConnectCluster.ConnectClusterSpec.builder()
.url("https://after")
.build())
.build();

StepVerifier.create(connectClusterService.validateConnectClusterCreation(connectCluster))
.consumeNextWith(errors -> assertTrue(errors.isEmpty()))
.verifyComplete();
}

@Test
void validateConnectClusterCreationAlreadyDefined() {
ManagedClusterProperties kafka = new ManagedClusterProperties("local");
Expand Down

0 comments on commit b571b19

Please sign in to comment.