Skip to content

Commit

Permalink
Handle wildcard parameter in Topic deletion API (#443)
Browse files Browse the repository at this point in the history
* Delete multiple topics with wildcard
  • Loading branch information
ThomasCAI-mlv authored Oct 3, 2024
1 parent ccc115a commit c2d293b
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class TopicController extends NamespacedResourceController {
* List topics by namespace, filtered by name parameter.
*
* @param namespace The namespace
* @param name The name parameter
* @param name The name parameter
* @return A list of topics
*/
@Get
Expand All @@ -67,7 +67,7 @@ public List<Topic> list(String namespace, @QueryValue(defaultValue = "*") String
* @param namespace The name
* @param topic The topic name
* @return The topic
* @deprecated use list(String, String name) instead.
* @deprecated use {@link #list(String, String)} instead.
*/
@Get("/{topic}")
@Deprecated(since = "1.12.0")
Expand All @@ -80,7 +80,7 @@ public Optional<Topic> get(String namespace, String topic) {
*
* @param namespace The namespace
* @param topic The topic
* @param dryrun Is dry run mode or not ?
* @param dryrun Is dry run mode or not?
* @return The created topic
*/
@Post
Expand Down Expand Up @@ -156,16 +156,55 @@ public HttpResponse<Topic> apply(String namespace, @Valid @Body Topic topic,
return formatHttpResponse(topicService.create(topic), status);
}

/**
* Delete topics.
*
* @param namespace The namespace
* @param name The name parameter
* @param dryrun Is dry run mode or not?
* @return An HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Delete
public HttpResponse<Void> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun)
throws InterruptedException, ExecutionException, TimeoutException {
Namespace ns = getNamespace(namespace);
List<Topic> topicsToDelete = topicService.findByWildcardName(ns, name);

if (topicsToDelete.isEmpty()) {
return HttpResponse.notFound();
}

if (dryrun) {
return HttpResponse.noContent();
}

topicsToDelete.forEach(topicToDelete ->
sendEventLog(
topicToDelete,
ApplyStatus.deleted,
topicToDelete.getSpec(),
null,
EMPTY_STRING));

topicService.deleteTopics(topicsToDelete);

return HttpResponse.noContent();
}

/**
* Delete a topic.
*
* @param namespace The namespace
* @param topic The topic
* @param dryrun Is dry run mode or not ?
* @param dryrun Is dry run mode or not?
* @return An HTTP response
* @deprecated use {@link #bulkDelete(String, String, boolean)} instead.
*/
@Status(HttpStatus.NO_CONTENT)
@Delete("/{topic}{?dryrun}")
@Deprecated(since = "1.13.0")
public HttpResponse<Void> delete(String namespace, String topic,
@QueryValue(defaultValue = "false") boolean dryrun)
throws InterruptedException, ExecutionException, TimeoutException {
Expand Down Expand Up @@ -194,7 +233,7 @@ public HttpResponse<Void> delete(String namespace, String topic,
EMPTY_STRING
);

topicService.delete(optionalTopic.get());
topicService.delete(topicToDelete);

return HttpResponse.noContent();
}
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/com/michelin/ns4kafka/service/TopicService.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,24 @@ public Topic create(Topic topic) {
public void delete(Topic topic) throws InterruptedException, ExecutionException, TimeoutException {
TopicAsyncExecutor topicAsyncExecutor = applicationContext.getBean(TopicAsyncExecutor.class,
Qualifiers.byName(topic.getMetadata().getCluster()));
topicAsyncExecutor.deleteTopic(topic);
topicAsyncExecutor.deleteTopics(List.of(topic));

topicRepository.delete(topic);
}

/**
* Delete multiple topics.
*
* @param topics The topics list
*/
public void deleteTopics(List<Topic> topics) throws InterruptedException, ExecutionException, TimeoutException {
TopicAsyncExecutor topicAsyncExecutor = applicationContext.getBean(TopicAsyncExecutor.class,
Qualifiers.byName(topics.getFirst().getMetadata().getCluster()));
topicAsyncExecutor.deleteTopics(topics);

topics.forEach(topic -> topicRepository.delete(topic));
}

/**
* List all topics colliding with existing topics on broker but not in Ns4Kafka.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,25 @@ public void deleteTopic(Topic topic) throws InterruptedException, ExecutionExcep
managedClusterProperties.getName());
}

/**
* Delete a list of topics.
*
* @param topics The topics to delete
*/
public void deleteTopics(List<Topic> topics) throws InterruptedException, ExecutionException, TimeoutException {
List<String> topicsNames = topics
.stream()
.map(topic -> topic.getMetadata().getName())
.toList();

getAdminClient().deleteTopics(topicsNames)
.all()
.get(30, TimeUnit.SECONDS);

log.info("Success deleting topics {} on {}", String.join(", ", topicsNames),
managedClusterProperties.getName());
}

/**
* Collect all topics on broker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,81 @@ void shouldGetTopic() {
}

@Test
void shouldDeleteMultipleTopics() throws InterruptedException, ExecutionException, TimeoutException {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
List<Topic> toDelete = List.of(
Topic.builder().metadata(Metadata.builder().name("prefix1.topic1").build()).build(),
Topic.builder().metadata(Metadata.builder().name("prefix1.topic2").build()).build());
when(topicService.findByWildcardName(ns, "prefix1.*"))
.thenReturn(toDelete);
when(securityService.username()).thenReturn(Optional.of("test-user"));
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false);
doNothing().when(topicService).deleteTopics(toDelete);
doNothing().when(applicationEventPublisher).publishEvent(any());

HttpResponse<Void> actual = topicController.bulkDelete("test", "prefix1.*", false);

assertEquals(HttpStatus.NO_CONTENT, actual.getStatus());
}

@Test
void shouldNotDeleteMultipleTopicsWhenNotFound() throws InterruptedException, ExecutionException, TimeoutException {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));

when(topicService.findByWildcardName(ns, "topic*"))
.thenReturn(List.of());

HttpResponse<Void> actual = topicController.bulkDelete("test", "topic*", false);

assertEquals(HttpStatus.NOT_FOUND, actual.getStatus());
verify(topicService, never()).delete(any());
}

@Test
void shouldNotDeleteMultipleTopicsInDryRunMode() throws InterruptedException, ExecutionException, TimeoutException {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));

List<Topic> toDelete = List.of(
Topic.builder()
.metadata(Metadata.builder()
.name("prefix.topic")
.build())
.build());

when(topicService.findByWildcardName(ns, "prefix.topic"))
.thenReturn(toDelete);

topicController.bulkDelete("test", "prefix.topic", true);

verify(topicService, never()).delete(any());
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteTopic() throws InterruptedException, ExecutionException, TimeoutException {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand Down Expand Up @@ -205,7 +280,8 @@ void shouldDeleteTopic() throws InterruptedException, ExecutionException, Timeou
}

@Test
void shouldDeleteTopicInDryRunMode() throws InterruptedException, ExecutionException, TimeoutException {
@SuppressWarnings("deprecation")
void shouldNotDeleteTopicInDryRunMode() throws InterruptedException, ExecutionException, TimeoutException {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
Expand Down Expand Up @@ -234,6 +310,7 @@ void shouldDeleteTopicInDryRunMode() throws InterruptedException, ExecutionExcep
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteTopicWhenUnauthorized() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.michelin.ns4kafka.integration;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertLinesMatch;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -50,6 +51,7 @@
import lombok.NoArgsConstructor;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -599,6 +601,98 @@ void shouldDeleteRecordsOnCompactTopic() throws InterruptedException {
assertEquals(HttpStatus.UNPROCESSABLE_ENTITY, exception.getStatus());
}

@Test
void shouldDeleteTopics() throws InterruptedException, ExecutionException {
Topic deleteTopic = Topic.builder()
.metadata(Metadata.builder()
.name("ns1-deleteTopic")
.namespace("ns1")
.build())
.spec(TopicSpec.builder()
.partitions(3)
.replicationFactor(1)
.configs(Map.of("cleanup.policy", "delete",
"min.insync.replicas", "1",
"retention.ms", "60000"))
.build())
.build();

Topic compactTopic = Topic.builder()
.metadata(Metadata.builder()
.name("ns1-compactTopic")
.namespace("ns1")
.build())
.spec(TopicSpec.builder()
.partitions(3)
.replicationFactor(1)
.configs(Map.of("cleanup.policy", "compact",
"min.insync.replicas", "1",
"retention.ms", "60000"))
.build())
.build();

Topic topicNotToDelete = Topic.builder()
.metadata(Metadata.builder()
.name("ns1-test")
.namespace("ns1")
.build())
.spec(TopicSpec.builder()
.partitions(3)
.replicationFactor(1)
.configs(Map.of("cleanup.policy", "compact",
"min.insync.replicas", "1",
"retention.ms", "60000"))
.build())
.build();

var createResponse1 = ns4KafkaClient
.toBlocking()
.exchange(HttpRequest
.create(HttpMethod.POST, "/api/namespaces/ns1/topics")
.bearerAuth(token)
.body(deleteTopic));

assertEquals("created", createResponse1.header("X-Ns4kafka-Result"));

var createResponse2 = ns4KafkaClient
.toBlocking()
.exchange(HttpRequest
.create(HttpMethod.POST, "/api/namespaces/ns1/topics")
.bearerAuth(token)
.body(compactTopic));

assertEquals("created", createResponse2.header("X-Ns4kafka-Result"));

var createResponse3 = ns4KafkaClient
.toBlocking()
.exchange(HttpRequest
.create(HttpMethod.POST, "/api/namespaces/ns1/topics")
.bearerAuth(token)
.body(topicNotToDelete));

assertEquals("created", createResponse3.header("X-Ns4kafka-Result"));

forceTopicSynchronization();

var deleteResponse = ns4KafkaClient
.toBlocking()
.exchange(HttpRequest
.create(HttpMethod.DELETE, "/api/namespaces/ns1/topics?name=ns1-*Topic")
.bearerAuth(token));

assertEquals(HttpStatus.NO_CONTENT, deleteResponse.getStatus());

forceTopicSynchronization();

Admin kafkaClient = getAdminClient();

Map<String, TopicListing> topics = kafkaClient.listTopics().namesToListings().get();

assertFalse(topics.containsKey("ns1-deleteTopic"));
assertFalse(topics.containsKey("ns1-compactTopic"));
assertTrue(topics.containsKey("ns1-test"));
}

private void forceTopicSynchronization() throws InterruptedException {
topicAsyncExecutorList.forEach(TopicAsyncExecutor::run);

Expand Down
Loading

0 comments on commit c2d293b

Please sign in to comment.