Skip to content

Commit

Permalink
Handle wildcard parameter in Connect Clusters deletion API (#444)
Browse files Browse the repository at this point in the history
  • Loading branch information
adriencalime authored Oct 2, 2024
1 parent 4b9d93a commit ccc115a
Show file tree
Hide file tree
Showing 5 changed files with 476 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,11 @@ public Mono<HttpResponse<ConnectCluster>> apply(String namespace, @Body @Valid C
* @param connectCluster The current connect cluster name to delete
* @param dryrun Run in dry mode or not
* @return A HTTP response
* @deprecated use {@link #bulkDelete(String, String, boolean)} instead.
*/
@Status(HttpStatus.NO_CONTENT)
@Delete("/{connectCluster}{?dryrun}")
@Deprecated(since = "1.13.0")
public HttpResponse<Void> delete(String namespace, String connectCluster,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);
Expand Down Expand Up @@ -166,20 +168,66 @@ public HttpResponse<Void> delete(String namespace, String connectCluster,
return HttpResponse.noContent();
}

ConnectCluster connectClusterToDelete = optionalConnectCluster.get();
performDeletion(optionalConnectCluster.get());
return HttpResponse.noContent();
}

sendEventLog(
connectClusterToDelete,
ApplyStatus.deleted,
connectClusterToDelete.getSpec(),
null,
EMPTY_STRING
);
/**
* Delete Kafka Connect clusters.
*
* @param namespace The current namespace
* @param name The name parameter
* @param dryrun Run in dry mode or not
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Delete
public HttpResponse<Void> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

connectClusterService.delete(connectClusterToDelete);
List<ConnectCluster> connectClusters = connectClusterService.findByWildcardNameWithOwnerPermission(ns, name);

List<String> validationErrors = new ArrayList<>();
connectClusters.forEach(cc -> {
List<Connector> connectors = connectorService.findAllByConnectCluster(ns, cc.getMetadata().getName());
if (!connectors.isEmpty()) {
validationErrors.add(invalidConnectClusterDeleteOperation(cc.getMetadata().getName(), connectors));
}
});

if (!validationErrors.isEmpty()) {
throw new ResourceValidationException(CONNECT_CLUSTER, name, validationErrors);
}

if (connectClusters.isEmpty()) {
return HttpResponse.notFound();
}

if (dryrun) {
return HttpResponse.noContent();
}

connectClusters.forEach(this::performDeletion);
return HttpResponse.noContent();
}

/**
* Perform the deletion of the connectCluster and send an event log.
*
* @param connectCluster The connectCluster to delete
*/
private void performDeletion(ConnectCluster connectCluster) {
sendEventLog(
connectCluster,
ApplyStatus.deleted,
connectCluster.getSpec(),
null,
EMPTY_STRING
);
connectClusterService.delete(connectCluster);
}

/**
* List vault Kafka Connect clusters by namespace.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public Flux<ConnectCluster> findAll(boolean all) {
if (all) {
results.addAll(managedClusterProperties
.stream()
.filter(cluster -> cluster.getConnects() != null)
.map(config -> config.getConnects().entrySet()
.stream()
.map(entry ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ void shouldGetConnectCluster() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteConnectClusterWhenNotOwner() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand All @@ -177,6 +178,7 @@ void shouldNotDeleteConnectClusterWhenNotOwner() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteConnectClusterWhenNotFound() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand All @@ -197,6 +199,7 @@ void shouldNotDeleteConnectClusterWhenNotFound() {
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteConnectCluster() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand Down Expand Up @@ -229,6 +232,7 @@ void shouldDeleteConnectCluster() {
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteConnectClusterInDryRunMode() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand Down Expand Up @@ -259,6 +263,7 @@ void shouldDeleteConnectClusterInDryRunMode() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteConnectClusterWithConnectorsAssociated() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand Down Expand Up @@ -290,6 +295,140 @@ void shouldNotDeleteConnectClusterWithConnectorsAssociated() {
result.getValidationErrors().getFirst());
}

@Test
void shouldDeleteConnectClusters() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

ConnectCluster connectCluster1 = ConnectCluster.builder()
.metadata(Metadata.builder()
.name("connect-cluster1")
.build())
.build();

ConnectCluster connectCluster2 = ConnectCluster.builder()
.metadata(Metadata.builder()
.name("connect-cluster2")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectorService.findAllByConnectCluster(ns, "connect-cluster1"))
.thenReturn(List.of());
when(connectorService.findAllByConnectCluster(ns, "connect-cluster2"))
.thenReturn(List.of());
when(connectClusterService.findByWildcardNameWithOwnerPermission(ns, "connect-cluster*"))
.thenReturn(List.of(connectCluster1, connectCluster2));
doNothing().when(connectClusterService).delete(connectCluster1);
doNothing().when(connectClusterService).delete(connectCluster2);
when(securityService.username()).thenReturn(Optional.of("test-user"));
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false);
doNothing().when(applicationEventPublisher).publishEvent(any());

HttpResponse<Void> actual = connectClusterController.bulkDelete("test", "connect-cluster*", false);
assertEquals(HttpStatus.NO_CONTENT, actual.getStatus());
}

@Test
void shouldDeleteConnectClustersInDryRunMode() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

ConnectCluster connectCluster = ConnectCluster.builder()
.metadata(Metadata.builder()
.name("connect-cluster")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectorService.findAllByConnectCluster(ns, "connect-cluster"))
.thenReturn(List.of());
when(connectClusterService.findByWildcardNameWithOwnerPermission(ns, "connect-cluster*"))
.thenReturn(List.of(connectCluster));

HttpResponse<Void> actual = connectClusterController.bulkDelete("test", "connect-cluster*", true);
assertEquals(HttpStatus.NO_CONTENT, actual.getStatus());

verify(connectClusterService, never()).delete(any());
}

@Test
void shouldNotDeleteConnectClustersWhenNotFound() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectClusterService.findByWildcardNameWithOwnerPermission(ns, "connect-cluster*"))
.thenReturn(List.of());

HttpResponse<Void> actual = connectClusterController.bulkDelete("test", "connect-cluster*", false);
assertEquals(HttpStatus.NOT_FOUND, actual.getStatus());
}

@Test
void shouldNotDeleteConnectClustersWithConnectorsAssociated() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

Connector connector = Connector.builder()
.metadata(Metadata.builder()
.name("connect1")
.build())
.build();

ConnectCluster connectCluster1 = ConnectCluster.builder()
.metadata(Metadata.builder()
.name("connect-cluster1")
.build())
.build();

ConnectCluster connectCluster2 = ConnectCluster.builder()
.metadata(Metadata.builder()
.name("connect-cluster2")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectorService.findAllByConnectCluster(ns, "connect-cluster1"))
.thenReturn(List.of());
when(connectorService.findAllByConnectCluster(ns, "connect-cluster2"))
.thenReturn(List.of());
when(connectClusterService.findByWildcardNameWithOwnerPermission(ns, "connect-cluster*"))
.thenReturn(List.of(connectCluster1, connectCluster2));

when(connectorService.findAllByConnectCluster(ns, "connect-cluster2"))
.thenReturn(List.of(connector));

ResourceValidationException result = assertThrows(ResourceValidationException.class,
() -> connectClusterController.bulkDelete("test", "connect-cluster*", false));

assertEquals(1, result.getValidationErrors().size());
assertEquals(
"Invalid \"delete\" operation: The Kafka Connect \"connect-cluster2\" has 1 deployed connector(s): "
+ "connect1. Please remove the associated connector(s) before deleting it.",
result.getValidationErrors().getFirst());
}

@Test
void shouldCreateConnectCluster() {
Namespace ns = Namespace.builder()
Expand Down
Loading

0 comments on commit ccc115a

Please sign in to comment.