From 6a250810198c54a2982559ce938430f350b92345 Mon Sep 17 00:00:00 2001 From: E046899 Date: Mon, 30 Sep 2024 17:07:38 +0200 Subject: [PATCH 1/4] manage kstream api delete endpoint with wildcard --- .../ns4kafka/controller/StreamController.java | 49 ++++++ .../controller/StreamControllerTest.java | 143 ++++++++++++++++++ 2 files changed, 192 insertions(+) diff --git a/src/main/java/com/michelin/ns4kafka/controller/StreamController.java b/src/main/java/com/michelin/ns4kafka/controller/StreamController.java index d19872c0..bd8a26f1 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/StreamController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/StreamController.java @@ -112,9 +112,11 @@ HttpResponse apply(String namespace, @Body @Valid KafkaStream strea * @param stream The Kafka Streams * @param dryrun Is dry run mode or not ? * @return An HTTP response + * @deprecated use bulkDelete instead. */ @Status(HttpStatus.NO_CONTENT) @Delete("/{stream}{?dryrun}") + @Deprecated(since = "1.13.0") HttpResponse delete(String namespace, String stream, @QueryValue(defaultValue = "false") boolean dryrun) { Namespace ns = getNamespace(namespace); if (!streamService.isNamespaceOwnerOfKafkaStream(ns, stream)) { @@ -144,4 +146,51 @@ HttpResponse delete(String namespace, String stream, @QueryValue(defaultVa streamService.delete(ns, optionalStream.get()); return HttpResponse.noContent(); } + + /** + * Delete a Kafka Streams. + * + * @param namespace The namespace + * @param name The name parameter + * @param dryrun Is dry run mode or not ? + * @return An HTTP response + */ + @Status(HttpStatus.NO_CONTENT) + @Delete + HttpResponse bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name, + @QueryValue(defaultValue = "false") boolean dryrun) { + Namespace ns = getNamespace(namespace); + + List kafkaStreams = streamService.findByWildcardName(ns, name); + + List validationErrors = kafkaStreams.stream() + .filter(kafkaStream -> + !streamService.isNamespaceOwnerOfKafkaStream(ns, kafkaStream.getMetadata().getName())) + .map(kafkaStream -> invalidOwner(kafkaStream.getMetadata().getName())) + .toList(); + + if (!validationErrors.isEmpty()) { + throw new ResourceValidationException(KAFKA_STREAM, name, validationErrors); + } + + if (kafkaStreams.isEmpty()) { + return HttpResponse.notFound(); + } + + if (dryrun) { + return HttpResponse.noContent(); + } + kafkaStreams.forEach(kafkaStream -> { + sendEventLog( + kafkaStream, + ApplyStatus.deleted, + kafkaStream.getMetadata(), + null, + EMPTY_STRING + ); + streamService.delete(ns, kafkaStream); + }); + + return HttpResponse.noContent(); + } } diff --git a/src/test/java/com/michelin/ns4kafka/controller/StreamControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/StreamControllerTest.java index 71d8c649..0bc06e86 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/StreamControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/StreamControllerTest.java @@ -327,6 +327,7 @@ void shouldNotCreateStreamsWhenValidationErrors() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteStreams() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -359,6 +360,7 @@ void shouldDeleteStreams() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteStreamsInDryRunMode() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -388,6 +390,7 @@ void shouldDeleteStreamsInDryRunMode() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteStreamsWhenNotFound() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -411,6 +414,7 @@ void shouldNotDeleteStreamsWhenNotFound() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteStreamsWhenNotOwner() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -428,4 +432,143 @@ void shouldNotDeleteStreamsWhenNotOwner() { assertThrows(ResourceValidationException.class, () -> streamController.delete("test", "test_stream1", false)); verify(streamService, never()).delete(any(), any()); } + + @Test + void shouldDeleteMultipleStreams() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + KafkaStream stream1 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream1") + .build()) + .build(); + + KafkaStream stream2 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream2") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) + .thenReturn(true); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream2")) + .thenReturn(true); + + when(streamService.findByWildcardName(ns, "test_stream*")) + .thenReturn(List.of(stream1, stream2)); + + when(securityService.username()).thenReturn(Optional.of("test-user")); + when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); + doNothing().when(applicationEventPublisher).publishEvent(any()); + doNothing().when(streamService).delete(ns, stream1); + doNothing().when(streamService).delete(ns, stream2); + var response = streamController.bulkDelete("test", "test_stream*", false); + assertEquals(HttpStatus.NO_CONTENT, response.getStatus()); + } + + @Test + void shouldDeleteMultipleStreamsInDryRunMode() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + KafkaStream stream1 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream1") + .build()) + .build(); + + KafkaStream stream2 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream2") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) + .thenReturn(true); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream2")) + .thenReturn(true); + + when(streamService.findByWildcardName(ns, "test_stream*")) + .thenReturn(List.of(stream1, stream2)); + + var response = streamController.bulkDelete("test", "test_stream*", true); + verify(streamService, never()).delete(any(), any()); + assertEquals(HttpStatus.NO_CONTENT, response.getStatus()); + } + + @Test + void shouldNotDeleteMultipleStreamsWhenNotFound() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + + when(streamService.findByWildcardName(ns, "test_stream*")) + .thenReturn(List.of()); + + var response = streamController.bulkDelete("test", "test_stream*", false); + verify(streamService, never()).delete(any(), any()); + + assertEquals(HttpStatus.NOT_FOUND, response.getStatus()); + } + + @Test + void shouldNotDeleteMultipleStreamsWhenNotOwner() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + KafkaStream stream1 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream1") + .build()) + .build(); + + KafkaStream stream2 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream2") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) + .thenReturn(true); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream2")) + .thenReturn(false); + + when(streamService.findByWildcardName(ns, "test_stream*")) + .thenReturn(List.of(stream1, stream2)); + + assertThrows(ResourceValidationException.class, () -> + streamController.bulkDelete("test", "test_stream*", false)); + verify(streamService, never()).delete(any(), any()); + } } \ No newline at end of file From 007780c691829f021a872c4775dd0f3278eb56fb Mon Sep 17 00:00:00 2001 From: E046899 Date: Wed, 2 Oct 2024 17:41:41 +0200 Subject: [PATCH 2/4] add delete integration test for streams --- .../integration/StreamIntegrationTest.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java index 3faf89d7..5d827537 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java @@ -16,6 +16,7 @@ import com.michelin.ns4kafka.model.Namespace.NamespaceSpec; import com.michelin.ns4kafka.service.executor.AccessControlEntryAsyncExecutor; import com.michelin.ns4kafka.validation.TopicValidator; +import io.micronaut.core.type.Argument; import io.micronaut.http.HttpMethod; import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; @@ -163,5 +164,19 @@ void shouldVerifyCreationOfAcls() throws InterruptedException, ExecutionExceptio assertEquals(1, aclTransactionalId.size()); assertTrue(aclTransactionalId.stream().findFirst().isPresent()); assertEquals(AclOperation.WRITE, aclTransactionalId.stream().findFirst().get().entry().operation()); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.DELETE, "/api/namespaces/nskafkastream/streams?name=kstream*") + .bearerAuth(token)); + + HttpResponse> streams = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.GET, "/api/namespaces/nskafkastream/streams") + .bearerAuth(token), Argument.listOf(KafkaStream.class)); + + assertEquals(0, streams.getBody().get().size()); } } From 0331bccbe6458f800572533e87dcde5213bf7350 Mon Sep 17 00:00:00 2001 From: thcai Date: Mon, 7 Oct 2024 17:51:58 +0200 Subject: [PATCH 3/4] Improve javadoc --- .../michelin/ns4kafka/controller/StreamController.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/StreamController.java b/src/main/java/com/michelin/ns4kafka/controller/StreamController.java index bd8a26f1..3ae71890 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/StreamController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/StreamController.java @@ -40,7 +40,7 @@ public class StreamController extends NamespacedResourceController { * List Kafka Streams by namespace, filtered by name parameter. * * @param namespace The namespace - * @param name The name parameter + * @param name The name parameter * @return A list of Kafka Streams */ @Get @@ -67,7 +67,7 @@ Optional get(String namespace, String stream) { * * @param namespace The namespace * @param stream The Kafka Stream - * @param dryrun Is dry run mode or not ? + * @param dryrun Is dry run mode or not? * @return An HTTP response */ @Post("/{?dryrun}") @@ -110,9 +110,9 @@ HttpResponse apply(String namespace, @Body @Valid KafkaStream strea * * @param namespace The namespace * @param stream The Kafka Streams - * @param dryrun Is dry run mode or not ? + * @param dryrun Is dry run mode or not? * @return An HTTP response - * @deprecated use bulkDelete instead. + * @deprecated use {@link #bulkDelete(String, String, boolean)} instead. */ @Status(HttpStatus.NO_CONTENT) @Delete("/{stream}{?dryrun}") @@ -152,7 +152,7 @@ HttpResponse delete(String namespace, String stream, @QueryValue(defaultVa * * @param namespace The namespace * @param name The name parameter - * @param dryrun Is dry run mode or not ? + * @param dryrun Is dry run mode or not? * @return An HTTP response */ @Status(HttpStatus.NO_CONTENT) From 5631c84eb953c31013a86c853393dcb63d90ce06 Mon Sep 17 00:00:00 2001 From: thcai Date: Mon, 7 Oct 2024 17:56:32 +0200 Subject: [PATCH 4/4] Remove indent --- .../controller/StreamControllerTest.java | 118 +++++++++--------- 1 file changed, 59 insertions(+), 59 deletions(-) diff --git a/src/test/java/com/michelin/ns4kafka/controller/StreamControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/StreamControllerTest.java index 0bc06e86..71a78309 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/StreamControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/StreamControllerTest.java @@ -436,35 +436,35 @@ void shouldNotDeleteStreamsWhenNotOwner() { @Test void shouldDeleteMultipleStreams() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); KafkaStream stream1 = KafkaStream.builder() - .metadata(Metadata.builder() - .name("test_stream1") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test_stream1") + .build()) + .build(); KafkaStream stream2 = KafkaStream.builder() - .metadata(Metadata.builder() - .name("test_stream2") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test_stream2") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) - .thenReturn(true); + .thenReturn(true); when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream2")) - .thenReturn(true); + .thenReturn(true); when(streamService.findByWildcardName(ns, "test_stream*")) - .thenReturn(List.of(stream1, stream2)); + .thenReturn(List.of(stream1, stream2)); when(securityService.username()).thenReturn(Optional.of("test-user")); when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); @@ -478,35 +478,35 @@ void shouldDeleteMultipleStreams() { @Test void shouldDeleteMultipleStreamsInDryRunMode() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); KafkaStream stream1 = KafkaStream.builder() - .metadata(Metadata.builder() - .name("test_stream1") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test_stream1") + .build()) + .build(); KafkaStream stream2 = KafkaStream.builder() - .metadata(Metadata.builder() - .name("test_stream2") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test_stream2") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) - .thenReturn(true); + .thenReturn(true); when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream2")) - .thenReturn(true); + .thenReturn(true); when(streamService.findByWildcardName(ns, "test_stream*")) - .thenReturn(List.of(stream1, stream2)); + .thenReturn(List.of(stream1, stream2)); var response = streamController.bulkDelete("test", "test_stream*", true); verify(streamService, never()).delete(any(), any()); @@ -516,17 +516,17 @@ void shouldDeleteMultipleStreamsInDryRunMode() { @Test void shouldNotDeleteMultipleStreamsWhenNotFound() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(streamService.findByWildcardName(ns, "test_stream*")) - .thenReturn(List.of()); + .thenReturn(List.of()); var response = streamController.bulkDelete("test", "test_stream*", false); verify(streamService, never()).delete(any(), any()); @@ -537,38 +537,38 @@ void shouldNotDeleteMultipleStreamsWhenNotFound() { @Test void shouldNotDeleteMultipleStreamsWhenNotOwner() { Namespace ns = Namespace.builder() - .metadata(Metadata.builder() - .name("test") - .cluster("local") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); KafkaStream stream1 = KafkaStream.builder() - .metadata(Metadata.builder() - .name("test_stream1") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test_stream1") + .build()) + .build(); KafkaStream stream2 = KafkaStream.builder() - .metadata(Metadata.builder() - .name("test_stream2") - .build()) - .build(); + .metadata(Metadata.builder() + .name("test_stream2") + .build()) + .build(); when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); + .thenReturn(Optional.of(ns)); when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) - .thenReturn(true); + .thenReturn(true); when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream2")) - .thenReturn(false); + .thenReturn(false); when(streamService.findByWildcardName(ns, "test_stream*")) - .thenReturn(List.of(stream1, stream2)); + .thenReturn(List.of(stream1, stream2)); assertThrows(ResourceValidationException.class, () -> - streamController.bulkDelete("test", "test_stream*", false)); + streamController.bulkDelete("test", "test_stream*", false)); verify(streamService, never()).delete(any(), any()); } } \ No newline at end of file