From c2d293b263d2ee83dc243a78ffafe0d37b4c58c1 Mon Sep 17 00:00:00 2001 From: Thomas CAI <92149044+ThomasCAI-mlv@users.noreply.github.com> Date: Thu, 3 Oct 2024 17:18:57 +0200 Subject: [PATCH] Handle wildcard parameter in Topic deletion API (#443) * Delete multiple topics with wildcard --- .../controller/topic/TopicController.java | 49 +++++++++- .../ns4kafka/service/TopicService.java | 15 ++- .../service/executor/TopicAsyncExecutor.java | 19 ++++ .../controller/TopicControllerTest.java | 79 +++++++++++++++- .../integration/TopicIntegrationTest.java | 94 +++++++++++++++++++ .../ns4kafka/service/TopicServiceTest.java | 31 +++++- .../executor/TopicAsyncExecutorTest.java | 33 ++++++- 7 files changed, 309 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/topic/TopicController.java b/src/main/java/com/michelin/ns4kafka/controller/topic/TopicController.java index a8ff2f6c..bf9aee26 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/topic/TopicController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/topic/TopicController.java @@ -53,7 +53,7 @@ public class TopicController extends NamespacedResourceController { * List topics by namespace, filtered by name parameter. * * @param namespace The namespace - * @param name The name parameter + * @param name The name parameter * @return A list of topics */ @Get @@ -67,7 +67,7 @@ public List list(String namespace, @QueryValue(defaultValue = "*") String * @param namespace The name * @param topic The topic name * @return The topic - * @deprecated use list(String, String name) instead. + * @deprecated use {@link #list(String, String)} instead. */ @Get("/{topic}") @Deprecated(since = "1.12.0") @@ -80,7 +80,7 @@ public Optional get(String namespace, String topic) { * * @param namespace The namespace * @param topic The topic - * @param dryrun Is dry run mode or not ? + * @param dryrun Is dry run mode or not? * @return The created topic */ @Post @@ -156,16 +156,55 @@ public HttpResponse apply(String namespace, @Valid @Body Topic topic, return formatHttpResponse(topicService.create(topic), status); } + /** + * Delete topics. + * + * @param namespace The namespace + * @param name The name parameter + * @param dryrun Is dry run mode or not? + * @return An HTTP response + */ + @Status(HttpStatus.NO_CONTENT) + @Delete + public HttpResponse bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name, + @QueryValue(defaultValue = "false") boolean dryrun) + throws InterruptedException, ExecutionException, TimeoutException { + Namespace ns = getNamespace(namespace); + List topicsToDelete = topicService.findByWildcardName(ns, name); + + if (topicsToDelete.isEmpty()) { + return HttpResponse.notFound(); + } + + if (dryrun) { + return HttpResponse.noContent(); + } + + topicsToDelete.forEach(topicToDelete -> + sendEventLog( + topicToDelete, + ApplyStatus.deleted, + topicToDelete.getSpec(), + null, + EMPTY_STRING)); + + topicService.deleteTopics(topicsToDelete); + + return HttpResponse.noContent(); + } + /** * Delete a topic. * * @param namespace The namespace * @param topic The topic - * @param dryrun Is dry run mode or not ? + * @param dryrun Is dry run mode or not? * @return An HTTP response + * @deprecated use {@link #bulkDelete(String, String, boolean)} instead. */ @Status(HttpStatus.NO_CONTENT) @Delete("/{topic}{?dryrun}") + @Deprecated(since = "1.13.0") public HttpResponse delete(String namespace, String topic, @QueryValue(defaultValue = "false") boolean dryrun) throws InterruptedException, ExecutionException, TimeoutException { @@ -194,7 +233,7 @@ public HttpResponse delete(String namespace, String topic, EMPTY_STRING ); - topicService.delete(optionalTopic.get()); + topicService.delete(topicToDelete); return HttpResponse.noContent(); } diff --git a/src/main/java/com/michelin/ns4kafka/service/TopicService.java b/src/main/java/com/michelin/ns4kafka/service/TopicService.java index 7a4837e5..31926346 100644 --- a/src/main/java/com/michelin/ns4kafka/service/TopicService.java +++ b/src/main/java/com/michelin/ns4kafka/service/TopicService.java @@ -131,11 +131,24 @@ public Topic create(Topic topic) { public void delete(Topic topic) throws InterruptedException, ExecutionException, TimeoutException { TopicAsyncExecutor topicAsyncExecutor = applicationContext.getBean(TopicAsyncExecutor.class, Qualifiers.byName(topic.getMetadata().getCluster())); - topicAsyncExecutor.deleteTopic(topic); + topicAsyncExecutor.deleteTopics(List.of(topic)); topicRepository.delete(topic); } + /** + * Delete multiple topics. + * + * @param topics The topics list + */ + public void deleteTopics(List topics) throws InterruptedException, ExecutionException, TimeoutException { + TopicAsyncExecutor topicAsyncExecutor = applicationContext.getBean(TopicAsyncExecutor.class, + Qualifiers.byName(topics.getFirst().getMetadata().getCluster())); + topicAsyncExecutor.deleteTopics(topics); + + topics.forEach(topic -> topicRepository.delete(topic)); + } + /** * List all topics colliding with existing topics on broker but not in Ns4Kafka. * diff --git a/src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java index 663fb8ea..4e12a88a 100644 --- a/src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java @@ -203,6 +203,25 @@ public void deleteTopic(Topic topic) throws InterruptedException, ExecutionExcep managedClusterProperties.getName()); } + /** + * Delete a list of topics. + * + * @param topics The topics to delete + */ + public void deleteTopics(List topics) throws InterruptedException, ExecutionException, TimeoutException { + List topicsNames = topics + .stream() + .map(topic -> topic.getMetadata().getName()) + .toList(); + + getAdminClient().deleteTopics(topicsNames) + .all() + .get(30, TimeUnit.SECONDS); + + log.info("Success deleting topics {} on {}", String.join(", ", topicsNames), + managedClusterProperties.getName()); + } + /** * Collect all topics on broker. * diff --git a/src/test/java/com/michelin/ns4kafka/controller/TopicControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/TopicControllerTest.java index 6e54d480..cfb65b9e 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/TopicControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/TopicControllerTest.java @@ -178,6 +178,81 @@ void shouldGetTopic() { } @Test + void shouldDeleteMultipleTopics() throws InterruptedException, ExecutionException, TimeoutException { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + List toDelete = List.of( + Topic.builder().metadata(Metadata.builder().name("prefix1.topic1").build()).build(), + Topic.builder().metadata(Metadata.builder().name("prefix1.topic2").build()).build()); + when(topicService.findByWildcardName(ns, "prefix1.*")) + .thenReturn(toDelete); + when(securityService.username()).thenReturn(Optional.of("test-user")); + when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); + doNothing().when(topicService).deleteTopics(toDelete); + doNothing().when(applicationEventPublisher).publishEvent(any()); + + HttpResponse actual = topicController.bulkDelete("test", "prefix1.*", false); + + assertEquals(HttpStatus.NO_CONTENT, actual.getStatus()); + } + + @Test + void shouldNotDeleteMultipleTopicsWhenNotFound() throws InterruptedException, ExecutionException, TimeoutException { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + + when(topicService.findByWildcardName(ns, "topic*")) + .thenReturn(List.of()); + + HttpResponse actual = topicController.bulkDelete("test", "topic*", false); + + assertEquals(HttpStatus.NOT_FOUND, actual.getStatus()); + verify(topicService, never()).delete(any()); + } + + @Test + void shouldNotDeleteMultipleTopicsInDryRunMode() throws InterruptedException, ExecutionException, TimeoutException { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + + List toDelete = List.of( + Topic.builder() + .metadata(Metadata.builder() + .name("prefix.topic") + .build()) + .build()); + + when(topicService.findByWildcardName(ns, "prefix.topic")) + .thenReturn(toDelete); + + topicController.bulkDelete("test", "prefix.topic", true); + + verify(topicService, never()).delete(any()); + } + + @Test + @SuppressWarnings("deprecation") void shouldDeleteTopic() throws InterruptedException, ExecutionException, TimeoutException { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -205,7 +280,8 @@ void shouldDeleteTopic() throws InterruptedException, ExecutionException, Timeou } @Test - void shouldDeleteTopicInDryRunMode() throws InterruptedException, ExecutionException, TimeoutException { + @SuppressWarnings("deprecation") + void shouldNotDeleteTopicInDryRunMode() throws InterruptedException, ExecutionException, TimeoutException { Namespace ns = Namespace.builder() .metadata(Metadata.builder() .name("test") @@ -234,6 +310,7 @@ void shouldDeleteTopicInDryRunMode() throws InterruptedException, ExecutionExcep } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteTopicWhenUnauthorized() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() diff --git a/src/test/java/com/michelin/ns4kafka/integration/TopicIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/TopicIntegrationTest.java index f1c7f4c0..ccc9a5cb 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/TopicIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/TopicIntegrationTest.java @@ -1,6 +1,7 @@ package com.michelin.ns4kafka.integration; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertLinesMatch; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -50,6 +51,7 @@ import lombok.NoArgsConstructor; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.config.ConfigResource; import org.junit.jupiter.api.BeforeAll; @@ -599,6 +601,98 @@ void shouldDeleteRecordsOnCompactTopic() throws InterruptedException { assertEquals(HttpStatus.UNPROCESSABLE_ENTITY, exception.getStatus()); } + @Test + void shouldDeleteTopics() throws InterruptedException, ExecutionException { + Topic deleteTopic = Topic.builder() + .metadata(Metadata.builder() + .name("ns1-deleteTopic") + .namespace("ns1") + .build()) + .spec(TopicSpec.builder() + .partitions(3) + .replicationFactor(1) + .configs(Map.of("cleanup.policy", "delete", + "min.insync.replicas", "1", + "retention.ms", "60000")) + .build()) + .build(); + + Topic compactTopic = Topic.builder() + .metadata(Metadata.builder() + .name("ns1-compactTopic") + .namespace("ns1") + .build()) + .spec(TopicSpec.builder() + .partitions(3) + .replicationFactor(1) + .configs(Map.of("cleanup.policy", "compact", + "min.insync.replicas", "1", + "retention.ms", "60000")) + .build()) + .build(); + + Topic topicNotToDelete = Topic.builder() + .metadata(Metadata.builder() + .name("ns1-test") + .namespace("ns1") + .build()) + .spec(TopicSpec.builder() + .partitions(3) + .replicationFactor(1) + .configs(Map.of("cleanup.policy", "compact", + "min.insync.replicas", "1", + "retention.ms", "60000")) + .build()) + .build(); + + var createResponse1 = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/topics") + .bearerAuth(token) + .body(deleteTopic)); + + assertEquals("created", createResponse1.header("X-Ns4kafka-Result")); + + var createResponse2 = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/topics") + .bearerAuth(token) + .body(compactTopic)); + + assertEquals("created", createResponse2.header("X-Ns4kafka-Result")); + + var createResponse3 = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/topics") + .bearerAuth(token) + .body(topicNotToDelete)); + + assertEquals("created", createResponse3.header("X-Ns4kafka-Result")); + + forceTopicSynchronization(); + + var deleteResponse = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.DELETE, "/api/namespaces/ns1/topics?name=ns1-*Topic") + .bearerAuth(token)); + + assertEquals(HttpStatus.NO_CONTENT, deleteResponse.getStatus()); + + forceTopicSynchronization(); + + Admin kafkaClient = getAdminClient(); + + Map topics = kafkaClient.listTopics().namesToListings().get(); + + assertFalse(topics.containsKey("ns1-deleteTopic")); + assertFalse(topics.containsKey("ns1-compactTopic")); + assertTrue(topics.containsKey("ns1-test")); + } + private void forceTopicSynchronization() throws InterruptedException { topicAsyncExecutorList.forEach(TopicAsyncExecutor::run); diff --git a/src/test/java/com/michelin/ns4kafka/service/TopicServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/TopicServiceTest.java index abad5aa8..64cafc6b 100644 --- a/src/test/java/com/michelin/ns4kafka/service/TopicServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/TopicServiceTest.java @@ -1160,7 +1160,7 @@ void shouldFindAllTopics() { } @Test - void shouldDelete() throws ExecutionException, InterruptedException, TimeoutException { + void shouldDeleteTopic() throws ExecutionException, InterruptedException, TimeoutException { Topic topic = Topic.builder() .metadata(Metadata.builder() .name("ns-topic1") @@ -1173,7 +1173,34 @@ void shouldDelete() throws ExecutionException, InterruptedException, TimeoutExce topicService.delete(topic); verify(topicRepository).delete(topic); - verify(topicAsyncExecutor).deleteTopic(topic); + verify(topicAsyncExecutor).deleteTopics(List.of(topic)); + } + + @Test + void shouldDeleteMultipleTopics() throws ExecutionException, InterruptedException, TimeoutException { + Topic topic1 = Topic.builder() + .metadata(Metadata.builder() + .name("ns-topic1") + .cluster("cluster") + .build()) + .build(); + + Topic topic2 = Topic.builder() + .metadata(Metadata.builder() + .name("ns-topic2") + .cluster("cluster") + .build()) + .build(); + + List topics = List.of(topic1, topic2); + + when(applicationContext.getBean(eq(TopicAsyncExecutor.class), any())).thenReturn(topicAsyncExecutor); + + topicService.deleteTopics(topics); + + verify(topicAsyncExecutor).deleteTopics(topics); + verify(topicRepository).delete(topic1); + verify(topicRepository).delete(topic2); } @Test diff --git a/src/test/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutorTest.java b/src/test/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutorTest.java index eba27fc7..0a999173 100644 --- a/src/test/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutorTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutorTest.java @@ -298,7 +298,36 @@ void shouldDeleteTopicNoTags() throws ExecutionException, InterruptedException, .build()) .build(); - topicAsyncExecutor.deleteTopic(topic); + topicAsyncExecutor.deleteTopics(List.of(topic)); + + verify(adminClient).deleteTopics(anyList()); + } + + @Test + void shouldDeleteMultipleTopics() throws ExecutionException, InterruptedException, TimeoutException { + when(deleteTopicsResult.all()).thenReturn(kafkaFuture); + when(adminClient.deleteTopics(anyList())).thenReturn(deleteTopicsResult); + when(managedClusterProperties.getAdminClient()).thenReturn(adminClient); + + Topic topic1 = Topic.builder() + .metadata(Metadata.builder() + .name("topic1") + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build(); + + Topic topic2 = Topic.builder() + .metadata(Metadata.builder() + .name("topic2") + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build(); + + List topics = List.of(topic1, topic2); + + topicAsyncExecutor.deleteTopics(topics); verify(adminClient).deleteTopics(anyList()); } @@ -319,7 +348,7 @@ void shouldDeleteTopicAndTags() throws ExecutionException, InterruptedException, .build()) .build(); - topicAsyncExecutor.deleteTopic(topic); + topicAsyncExecutor.deleteTopics(List.of(topic)); verify(adminClient).deleteTopics(anyList()); }