Skip to content

Commit

Permalink
delete connectors api with wildcard
Browse files Browse the repository at this point in the history
  • Loading branch information
adriencalime committed Sep 30, 2024
1 parent 5286f98 commit da2a85c
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.michelin.ns4kafka.service.ResourceQuotaService;
import com.michelin.ns4kafka.util.enumation.ApplyStatus;
import com.michelin.ns4kafka.util.exception.ResourceValidationException;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MutableHttpResponse;
Expand Down Expand Up @@ -165,9 +164,11 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec
* @param connector The current connector name to delete
* @param dryrun Run in dry mode or not
* @return A HTTP response
* @deprecated use bulkDelete instead.
*/
@Status(HttpStatus.NO_CONTENT)
@Delete("/{connector}{?dryrun}")
@Deprecated(since = "1.13.0")
public Mono<HttpResponse<Void>> delete(String namespace, String connector,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);
Expand Down Expand Up @@ -201,6 +202,48 @@ public Mono<HttpResponse<Void>> delete(String namespace, String connector,
.map(httpResponse -> HttpResponse.noContent());
}

/**
* Delete connectors.
*
* @param namespace The current namespace
* @param name The name parameter
* @param dryrun Run in dry mode or not
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Delete
public Mono<HttpResponse<Void>> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

List<Connector> connectors = connectorService.findByWildcardName(ns, name);

// Validate ownership
List<String> validationErrors = connectors.stream()
.filter(connector -> !connectorService.isNamespaceOwnerOfConnect(ns, connector.getMetadata().getName()))
.map(connector -> invalidOwner(connector.getMetadata().getName()))
.toList();

if (!validationErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(CONNECTOR, name, validationErrors));
}

if (connectors.isEmpty()) {
return Mono.just(HttpResponse.notFound());
}

if (dryrun) {
return Mono.just(HttpResponse.noContent());
}

return Flux.fromIterable(connectors)
.flatMap(connector -> {
sendEventLog(connector, ApplyStatus.deleted, connector.getSpec(), null, EMPTY_STRING);
return connectorService.delete(ns, connector);
})
.then(Mono.just(HttpResponse.noContent()));
}

/**
* Change the state of a connector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ void shouldGetConnector() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteConnectorWhenNotOwned() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand All @@ -180,6 +181,7 @@ void shouldNotDeleteConnectorWhenNotOwned() {
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteConnector() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand Down Expand Up @@ -207,6 +209,7 @@ void shouldDeleteConnector() {
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteConnectorInDryRunMode() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand Down Expand Up @@ -236,6 +239,7 @@ void shouldDeleteConnectorInDryRunMode() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteConnectorWhenNotFound() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand All @@ -258,6 +262,138 @@ void shouldNotDeleteConnectorWhenNotFound() {
verify(connectorService, never()).delete(any(), any());
}

@Test
void shouldDeleteConnectors() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

Connector connector1 = Connector.builder().metadata(Metadata.builder().name("connect1").build()).build();
Connector connector2 = Connector.builder().metadata(Metadata.builder().name("connect2").build()).build();
when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1"))
.thenReturn(true);
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect2"))
.thenReturn(true);
when(connectorService.findByWildcardName(ns, "connect*"))
.thenReturn(List.of(connector1, connector2));
when(connectorService.delete(ns, connector1))
.thenReturn(Mono.just(HttpResponse.noContent()));
when(connectorService.delete(ns, connector2))
.thenReturn(Mono.just(HttpResponse.noContent()));
when(securityService.username()).thenReturn(Optional.of("test-user"));
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false);
doNothing().when(applicationEventPublisher).publishEvent(any());

StepVerifier.create(connectorController.bulkDelete("test", "connect*", false))
.consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus()))
.verifyComplete();
}

@Test
void shouldNotDeleteConnectorsWhenNotFound() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectorService.findByWildcardName(ns, "connect*"))
.thenReturn(List.of());

StepVerifier.create(connectorController.bulkDelete("test", "connect*", true))
.consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus()))
.verifyComplete();

verify(connectorService, never()).delete(any(), any());
}

@Test
void shouldDeleteConnectorsInDryRunMode() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

Connector connector1 = Connector.builder()
.metadata(Metadata.builder()
.name("connect1")
.build())
.build();

Connector connector2 = Connector.builder()
.metadata(Metadata.builder()
.name("connect2")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectorService.findByWildcardName(ns, "connect*"))
.thenReturn(List.of(connector1, connector2));
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1"))
.thenReturn(true);
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect2"))
.thenReturn(true);

StepVerifier.create(connectorController.bulkDelete("test", "connect*", true))
.consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus()))
.verifyComplete();

verify(connectorService, never()).delete(any(), any());
}

@Test
void shouldNotDeleteConnectorsWhenNotOwned() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

Connector connector1 = Connector.builder()
.metadata(Metadata.builder()
.name("connect1")
.build())
.build();

Connector connector2 = Connector.builder()
.metadata(Metadata.builder()
.name("connect2")
.build())
.build();

when(connectorService.findByWildcardName(ns, "connect*"))
.thenReturn(List.of(connector1, connector2));

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1"))
.thenReturn(false);
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect2"))
.thenReturn(true);

StepVerifier.create(connectorController.bulkDelete("test", "connect*", false))
.consumeErrorWith(error -> {
assertEquals(ResourceValidationException.class, error.getClass());
assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size());
assertEquals(
"Invalid value \"connect1\" for field \"name\": namespace is not owner of the resource.",
((ResourceValidationException) error).getValidationErrors().getFirst());
})
.verify();
}

@Test
void shouldNotCreateConnectorWhenNotOwner() {
Connector connector = Connector.builder()
Expand Down

0 comments on commit da2a85c

Please sign in to comment.