Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle wildcard parameter in Connector deletion API #451

Merged
merged 4 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.michelin.ns4kafka.service.ResourceQuotaService;
import com.michelin.ns4kafka.util.enumation.ApplyStatus;
import com.michelin.ns4kafka.util.exception.ResourceValidationException;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MutableHttpResponse;
Expand Down Expand Up @@ -66,7 +65,7 @@ public List<Connector> list(String namespace, @QueryValue(defaultValue = "*") St
* @param namespace The namespace
* @param connector The name
* @return A connector
* @deprecated use list(String, String name) instead.
* @deprecated use {@link #list(String, String)} instead.
*/
@Get("/{connector}")
@Deprecated(since = "1.12.0")
Expand Down Expand Up @@ -165,9 +164,11 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec
* @param connector The current connector name to delete
* @param dryrun Run in dry mode or not
* @return A HTTP response
* @deprecated use {@link #bulkDelete(String, String, boolean)} instead.
*/
@Status(HttpStatus.NO_CONTENT)
@Delete("/{connector}{?dryrun}")
@Deprecated(since = "1.13.0")
public Mono<HttpResponse<Void>> delete(String namespace, String connector,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);
Expand Down Expand Up @@ -201,6 +202,48 @@ public Mono<HttpResponse<Void>> delete(String namespace, String connector,
.map(httpResponse -> HttpResponse.noContent());
}

/**
* Delete connectors.
*
* @param namespace The current namespace
* @param name The name parameter
* @param dryrun Run in dry mode or not
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Delete
public Mono<HttpResponse<Void>> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

List<Connector> connectors = connectorService.findByWildcardName(ns, name);

// Validate ownership
List<String> validationErrors = connectors.stream()
.filter(connector -> !connectorService.isNamespaceOwnerOfConnect(ns, connector.getMetadata().getName()))
.map(connector -> invalidOwner(connector.getMetadata().getName()))
.toList();

if (!validationErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(CONNECTOR, name, validationErrors));
}

if (connectors.isEmpty()) {
return Mono.just(HttpResponse.notFound());
}

if (dryrun) {
return Mono.just(HttpResponse.noContent());
}

return Flux.fromIterable(connectors)
.flatMap(connector -> {
sendEventLog(connector, ApplyStatus.deleted, connector.getSpec(), null, EMPTY_STRING);
return connectorService.delete(ns, connector);
})
.then(Mono.just(HttpResponse.noContent()));
}

/**
* Change the state of a connector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ void shouldGetConnector() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteConnectorWhenNotOwned() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand All @@ -180,6 +181,7 @@ void shouldNotDeleteConnectorWhenNotOwned() {
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteConnector() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand Down Expand Up @@ -207,6 +209,7 @@ void shouldDeleteConnector() {
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteConnectorInDryRunMode() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand Down Expand Up @@ -236,6 +239,7 @@ void shouldDeleteConnectorInDryRunMode() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteConnectorWhenNotFound() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand All @@ -258,6 +262,138 @@ void shouldNotDeleteConnectorWhenNotFound() {
verify(connectorService, never()).delete(any(), any());
}

@Test
void shouldDeleteConnectors() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

Connector connector1 = Connector.builder().metadata(Metadata.builder().name("connect1").build()).build();
Connector connector2 = Connector.builder().metadata(Metadata.builder().name("connect2").build()).build();
when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1"))
.thenReturn(true);
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect2"))
.thenReturn(true);
when(connectorService.findByWildcardName(ns, "connect*"))
.thenReturn(List.of(connector1, connector2));
when(connectorService.delete(ns, connector1))
.thenReturn(Mono.just(HttpResponse.noContent()));
when(connectorService.delete(ns, connector2))
.thenReturn(Mono.just(HttpResponse.noContent()));
when(securityService.username()).thenReturn(Optional.of("test-user"));
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false);
doNothing().when(applicationEventPublisher).publishEvent(any());

StepVerifier.create(connectorController.bulkDelete("test", "connect*", false))
.consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus()))
.verifyComplete();
}

@Test
void shouldNotDeleteConnectorsWhenNotFound() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectorService.findByWildcardName(ns, "connect*"))
.thenReturn(List.of());

StepVerifier.create(connectorController.bulkDelete("test", "connect*", true))
.consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus()))
.verifyComplete();

verify(connectorService, never()).delete(any(), any());
}

@Test
void shouldDeleteConnectorsInDryRunMode() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

Connector connector1 = Connector.builder()
.metadata(Metadata.builder()
.name("connect1")
.build())
.build();

Connector connector2 = Connector.builder()
.metadata(Metadata.builder()
.name("connect2")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectorService.findByWildcardName(ns, "connect*"))
.thenReturn(List.of(connector1, connector2));
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1"))
.thenReturn(true);
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect2"))
.thenReturn(true);

StepVerifier.create(connectorController.bulkDelete("test", "connect*", true))
.consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus()))
.verifyComplete();

verify(connectorService, never()).delete(any(), any());
}

@Test
void shouldNotDeleteConnectorsWhenNotOwned() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

Connector connector1 = Connector.builder()
.metadata(Metadata.builder()
.name("connect1")
.build())
.build();

Connector connector2 = Connector.builder()
.metadata(Metadata.builder()
.name("connect2")
.build())
.build();

when(connectorService.findByWildcardName(ns, "connect*"))
.thenReturn(List.of(connector1, connector2));

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1"))
.thenReturn(false);
when(connectorService.isNamespaceOwnerOfConnect(ns, "connect2"))
.thenReturn(true);

StepVerifier.create(connectorController.bulkDelete("test", "connect*", false))
.consumeErrorWith(error -> {
assertEquals(ResourceValidationException.class, error.getClass());
assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size());
assertEquals(
"Invalid value \"connect1\" for field \"name\": namespace is not owner of the resource.",
((ResourceValidationException) error).getValidationErrors().getFirst());
})
.verify();
}

@Test
void shouldNotCreateConnectorWhenNotOwner() {
Connector connector = Connector.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.michelin.ns4kafka.validation.ConnectValidator;
import com.michelin.ns4kafka.validation.TopicValidator;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
Expand Down Expand Up @@ -323,6 +324,20 @@ void shouldDeployConnectors() throws InterruptedException {

assertTrue(actualConnectorWithFillParameter.config().containsKey("file"));
assertEquals("test", actualConnectorWithFillParameter.config().get("file"));

ns4KafkaClient
.toBlocking()
.exchange(HttpRequest
.create(HttpMethod.DELETE, "/api/namespaces/ns1/connectors?name=ns1*")
.bearerAuth(token));

HttpResponse<List<Connector>> connectors = ns4KafkaClient
.toBlocking()
.exchange(HttpRequest
.create(HttpMethod.GET, "/api/namespaces/ns1/connectors")
.bearerAuth(token), Argument.listOf(Connector.class));

assertEquals(0, connectors.getBody().get().size());
}

@Test
Expand Down Expand Up @@ -610,7 +625,7 @@ private void forceConnectorSynchronization() throws InterruptedException {
.blockLast();

// Wait for Kafka Connect to deploy and update connectors
Thread.sleep(3000);
Thread.sleep(4000);
}

private void waitForConnectorAndTasksToBeInState(String connector, Connector.TaskState state)
Expand Down