diff --git a/src/main/java/com/michelin/ns4kafka/controller/StreamController.java b/src/main/java/com/michelin/ns4kafka/controller/StreamController.java index d19872c0..3ae71890 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/StreamController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/StreamController.java @@ -40,7 +40,7 @@ public class StreamController extends NamespacedResourceController { * List Kafka Streams by namespace, filtered by name parameter. * * @param namespace The namespace - * @param name The name parameter + * @param name The name parameter * @return A list of Kafka Streams */ @Get @@ -67,7 +67,7 @@ Optional get(String namespace, String stream) { * * @param namespace The namespace * @param stream The Kafka Stream - * @param dryrun Is dry run mode or not ? + * @param dryrun Is dry run mode or not? * @return An HTTP response */ @Post("/{?dryrun}") @@ -110,11 +110,13 @@ HttpResponse apply(String namespace, @Body @Valid KafkaStream strea * * @param namespace The namespace * @param stream The Kafka Streams - * @param dryrun Is dry run mode or not ? + * @param dryrun Is dry run mode or not? * @return An HTTP response + * @deprecated use {@link #bulkDelete(String, String, boolean)} instead. */ @Status(HttpStatus.NO_CONTENT) @Delete("/{stream}{?dryrun}") + @Deprecated(since = "1.13.0") HttpResponse delete(String namespace, String stream, @QueryValue(defaultValue = "false") boolean dryrun) { Namespace ns = getNamespace(namespace); if (!streamService.isNamespaceOwnerOfKafkaStream(ns, stream)) { @@ -144,4 +146,51 @@ HttpResponse delete(String namespace, String stream, @QueryValue(defaultVa streamService.delete(ns, optionalStream.get()); return HttpResponse.noContent(); } + + /** + * Delete a Kafka Streams. + * + * @param namespace The namespace + * @param name The name parameter + * @param dryrun Is dry run mode or not? + * @return An HTTP response + */ + @Status(HttpStatus.NO_CONTENT) + @Delete + HttpResponse bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name, + @QueryValue(defaultValue = "false") boolean dryrun) { + Namespace ns = getNamespace(namespace); + + List kafkaStreams = streamService.findByWildcardName(ns, name); + + List validationErrors = kafkaStreams.stream() + .filter(kafkaStream -> + !streamService.isNamespaceOwnerOfKafkaStream(ns, kafkaStream.getMetadata().getName())) + .map(kafkaStream -> invalidOwner(kafkaStream.getMetadata().getName())) + .toList(); + + if (!validationErrors.isEmpty()) { + throw new ResourceValidationException(KAFKA_STREAM, name, validationErrors); + } + + if (kafkaStreams.isEmpty()) { + return HttpResponse.notFound(); + } + + if (dryrun) { + return HttpResponse.noContent(); + } + kafkaStreams.forEach(kafkaStream -> { + sendEventLog( + kafkaStream, + ApplyStatus.deleted, + kafkaStream.getMetadata(), + null, + EMPTY_STRING + ); + streamService.delete(ns, kafkaStream); + }); + + return HttpResponse.noContent(); + } } diff --git a/src/test/java/com/michelin/ns4kafka/controller/StreamControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/StreamControllerTest.java index 71d8c649..71a78309 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/StreamControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/StreamControllerTest.java @@ -327,6 +327,7 @@ void shouldNotCreateStreamsWhenValidationErrors() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteStreams() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -359,6 +360,7 @@ void shouldDeleteStreams() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteStreamsInDryRunMode() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -388,6 +390,7 @@ void shouldDeleteStreamsInDryRunMode() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteStreamsWhenNotFound() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -411,6 +414,7 @@ void shouldNotDeleteStreamsWhenNotFound() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteStreamsWhenNotOwner() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -428,4 +432,143 @@ void shouldNotDeleteStreamsWhenNotOwner() { assertThrows(ResourceValidationException.class, () -> streamController.delete("test", "test_stream1", false)); verify(streamService, never()).delete(any(), any()); } + + @Test + void shouldDeleteMultipleStreams() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + KafkaStream stream1 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream1") + .build()) + .build(); + + KafkaStream stream2 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream2") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) + .thenReturn(true); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream2")) + .thenReturn(true); + + when(streamService.findByWildcardName(ns, "test_stream*")) + .thenReturn(List.of(stream1, stream2)); + + when(securityService.username()).thenReturn(Optional.of("test-user")); + when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); + doNothing().when(applicationEventPublisher).publishEvent(any()); + doNothing().when(streamService).delete(ns, stream1); + doNothing().when(streamService).delete(ns, stream2); + var response = streamController.bulkDelete("test", "test_stream*", false); + assertEquals(HttpStatus.NO_CONTENT, response.getStatus()); + } + + @Test + void shouldDeleteMultipleStreamsInDryRunMode() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + KafkaStream stream1 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream1") + .build()) + .build(); + + KafkaStream stream2 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream2") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) + .thenReturn(true); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream2")) + .thenReturn(true); + + when(streamService.findByWildcardName(ns, "test_stream*")) + .thenReturn(List.of(stream1, stream2)); + + var response = streamController.bulkDelete("test", "test_stream*", true); + verify(streamService, never()).delete(any(), any()); + assertEquals(HttpStatus.NO_CONTENT, response.getStatus()); + } + + @Test + void shouldNotDeleteMultipleStreamsWhenNotFound() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + + when(streamService.findByWildcardName(ns, "test_stream*")) + .thenReturn(List.of()); + + var response = streamController.bulkDelete("test", "test_stream*", false); + verify(streamService, never()).delete(any(), any()); + + assertEquals(HttpStatus.NOT_FOUND, response.getStatus()); + } + + @Test + void shouldNotDeleteMultipleStreamsWhenNotOwner() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + KafkaStream stream1 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream1") + .build()) + .build(); + + KafkaStream stream2 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream2") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) + .thenReturn(true); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream2")) + .thenReturn(false); + + when(streamService.findByWildcardName(ns, "test_stream*")) + .thenReturn(List.of(stream1, stream2)); + + assertThrows(ResourceValidationException.class, () -> + streamController.bulkDelete("test", "test_stream*", false)); + verify(streamService, never()).delete(any(), any()); + } } \ No newline at end of file diff --git a/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java index 3faf89d7..5d827537 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java @@ -16,6 +16,7 @@ import com.michelin.ns4kafka.model.Namespace.NamespaceSpec; import com.michelin.ns4kafka.service.executor.AccessControlEntryAsyncExecutor; import com.michelin.ns4kafka.validation.TopicValidator; +import io.micronaut.core.type.Argument; import io.micronaut.http.HttpMethod; import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; @@ -163,5 +164,19 @@ void shouldVerifyCreationOfAcls() throws InterruptedException, ExecutionExceptio assertEquals(1, aclTransactionalId.size()); assertTrue(aclTransactionalId.stream().findFirst().isPresent()); assertEquals(AclOperation.WRITE, aclTransactionalId.stream().findFirst().get().entry().operation()); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.DELETE, "/api/namespaces/nskafkastream/streams?name=kstream*") + .bearerAuth(token)); + + HttpResponse> streams = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.GET, "/api/namespaces/nskafkastream/streams") + .bearerAuth(token), Argument.listOf(KafkaStream.class)); + + assertEquals(0, streams.getBody().get().size()); } }