Skip to content

Commit

Permalink
Handle wildcard parameter in Connector deletion API (#451)
Browse files Browse the repository at this point in the history
* delete connectors api with wildcard

* add delete integration test for connectors

* Improve javadoc & remove extra indents

* Increase sleep time

---------

Co-authored-by: thcai <[email protected]>
  • Loading branch information
adriencalime and ThomasCAI-mlv authored Oct 11, 2024
1 parent 18ff01f commit be5f1be
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.michelin.ns4kafka.service.ResourceQuotaService;
import com.michelin.ns4kafka.util.enumation.ApplyStatus;
import com.michelin.ns4kafka.util.exception.ResourceValidationException;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MutableHttpResponse;
Expand Down Expand Up @@ -66,7 +65,7 @@ public List<Connector> list(String namespace, @QueryValue(defaultValue = "*") St
* @param namespace The namespace
* @param connector The name
* @return A connector
* @deprecated use list(String, String name) instead.
* @deprecated use {@link #list(String, String)} instead.
*/
@Get("/{connector}")
@Deprecated(since = "1.12.0")
Expand Down Expand Up @@ -165,9 +164,11 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec
* @param connector The current connector name to delete
* @param dryrun Run in dry mode or not
* @return A HTTP response
* @deprecated use {@link #bulkDelete(String, String, boolean)} instead.
*/
@Status(HttpStatus.NO_CONTENT)
@Delete("/{connector}{?dryrun}")
@Deprecated(since = "1.13.0")
public Mono<HttpResponse<Void>> delete(String namespace, String connector,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);
Expand Down Expand Up @@ -201,6 +202,48 @@ public Mono<HttpResponse<Void>> delete(String namespace, String connector,
.map(httpResponse -> HttpResponse.noContent());
}

/**
* Delete connectors.
*
* @param namespace The current namespace
* @param name The name parameter
* @param dryrun Run in dry mode or not
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Delete
public Mono<HttpResponse<Void>> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

List<Connector> connectors = connectorService.findByWildcardName(ns, name);

// Validate ownership
List<String> validationErrors = connectors.stream()
.filter(connector -> !connectorService.isNamespaceOwnerOfConnect(ns, connector.getMetadata().getName()))
.map(connector -> invalidOwner(connector.getMetadata().getName()))
.toList();

if (!validationErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(CONNECTOR, name, validationErrors));
}

if (connectors.isEmpty()) {
return Mono.just(HttpResponse.notFound());
}

if (dryrun) {
return Mono.just(HttpResponse.noContent());
}

return Flux.fromIterable(connectors)
.flatMap(connector -> {
sendEventLog(connector, ApplyStatus.deleted, connector.getSpec(), null, EMPTY_STRING);
return connectorService.delete(ns, connector);
})
.then(Mono.just(HttpResponse.noContent()));
}

/**
* Change the state of a connector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ void shouldGetConnector() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteConnectorWhenNotOwned() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand All @@ -180,6 +181,7 @@ void shouldNotDeleteConnectorWhenNotOwned() {
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteConnector() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand Down Expand Up @@ -207,6 +209,7 @@ void shouldDeleteConnector() {
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteConnectorInDryRunMode() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand Down Expand Up @@ -236,6 +239,7 @@ void shouldDeleteConnectorInDryRunMode() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteConnectorWhenNotFound() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand All @@ -258,6 +262,138 @@ void shouldNotDeleteConnectorWhenNotFound() {
verify(connectorService, never()).delete(any(), any());
}

@Test
void shouldDeleteConnectors() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

Connector connector1 = Connector.builder().metadata(Metadata.builder().name("connect1").build()).build();
Connector connector2 = Connector.builder().metadata(Metadata.builder().name("connect2").build()).build();
when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1"))
.thenReturn(true);
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect2"))
.thenReturn(true);
when(connectorService.findByWildcardName(ns, "connect*"))
.thenReturn(List.of(connector1, connector2));
when(connectorService.delete(ns, connector1))
.thenReturn(Mono.just(HttpResponse.noContent()));
when(connectorService.delete(ns, connector2))
.thenReturn(Mono.just(HttpResponse.noContent()));
when(securityService.username()).thenReturn(Optional.of("test-user"));
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false);
doNothing().when(applicationEventPublisher).publishEvent(any());

StepVerifier.create(connectorController.bulkDelete("test", "connect*", false))
.consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus()))
.verifyComplete();
}

@Test
void shouldNotDeleteConnectorsWhenNotFound() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectorService.findByWildcardName(ns, "connect*"))
.thenReturn(List.of());

StepVerifier.create(connectorController.bulkDelete("test", "connect*", true))
.consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus()))
.verifyComplete();

verify(connectorService, never()).delete(any(), any());
}

@Test
void shouldDeleteConnectorsInDryRunMode() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

Connector connector1 = Connector.builder()
.metadata(Metadata.builder()
.name("connect1")
.build())
.build();

Connector connector2 = Connector.builder()
.metadata(Metadata.builder()
.name("connect2")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectorService.findByWildcardName(ns, "connect*"))
.thenReturn(List.of(connector1, connector2));
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1"))
.thenReturn(true);
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect2"))
.thenReturn(true);

StepVerifier.create(connectorController.bulkDelete("test", "connect*", true))
.consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus()))
.verifyComplete();

verify(connectorService, never()).delete(any(), any());
}

@Test
void shouldNotDeleteConnectorsWhenNotOwned() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

Connector connector1 = Connector.builder()
.metadata(Metadata.builder()
.name("connect1")
.build())
.build();

Connector connector2 = Connector.builder()
.metadata(Metadata.builder()
.name("connect2")
.build())
.build();

when(connectorService.findByWildcardName(ns, "connect*"))
.thenReturn(List.of(connector1, connector2));

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1"))
.thenReturn(false);
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect2"))
.thenReturn(true);

StepVerifier.create(connectorController.bulkDelete("test", "connect*", false))
.consumeErrorWith(error -> {
assertEquals(ResourceValidationException.class, error.getClass());
assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size());
assertEquals(
"Invalid value \"connect1\" for field \"name\": namespace is not owner of the resource.",
((ResourceValidationException) error).getValidationErrors().getFirst());
})
.verify();
}

@Test
void shouldNotCreateConnectorWhenNotOwner() {
Connector connector = Connector.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.michelin.ns4kafka.validation.ConnectValidator;
import com.michelin.ns4kafka.validation.TopicValidator;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
Expand Down Expand Up @@ -323,6 +324,20 @@ void shouldDeployConnectors() throws InterruptedException {

assertTrue(actualConnectorWithFillParameter.config().containsKey("file"));
assertEquals("test", actualConnectorWithFillParameter.config().get("file"));

ns4KafkaClient
.toBlocking()
.exchange(HttpRequest
.create(HttpMethod.DELETE, "/api/namespaces/ns1/connectors?name=ns1*")
.bearerAuth(token));

HttpResponse<List<Connector>> connectors = ns4KafkaClient
.toBlocking()
.exchange(HttpRequest
.create(HttpMethod.GET, "/api/namespaces/ns1/connectors")
.bearerAuth(token), Argument.listOf(Connector.class));

assertEquals(0, connectors.getBody().get().size());
}

@Test
Expand Down Expand Up @@ -610,7 +625,7 @@ private void forceConnectorSynchronization() throws InterruptedException {
.blockLast();

// Wait for Kafka Connect to deploy and update connectors
Thread.sleep(3000);
Thread.sleep(4000);
}

private void waitForConnectorAndTasksToBeInState(String connector, Connector.TaskState state)
Expand Down

0 comments on commit be5f1be

Please sign in to comment.