Skip to content

Commit

Permalink
Handle wildcard parameter in Kstream deletion API (#455)
Browse files Browse the repository at this point in the history
* Handle wildcard parameter in Kstream deletion API
  • Loading branch information
adriencalime authored Oct 9, 2024
1 parent 1075463 commit c50b6fd
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class StreamController extends NamespacedResourceController {
* List Kafka Streams by namespace, filtered by name parameter.
*
* @param namespace The namespace
* @param name The name parameter
* @param name The name parameter
* @return A list of Kafka Streams
*/
@Get
Expand All @@ -67,7 +67,7 @@ Optional<KafkaStream> get(String namespace, String stream) {
*
* @param namespace The namespace
* @param stream The Kafka Stream
* @param dryrun Is dry run mode or not ?
* @param dryrun Is dry run mode or not?
* @return An HTTP response
*/
@Post("/{?dryrun}")
Expand Down Expand Up @@ -110,11 +110,13 @@ HttpResponse<KafkaStream> apply(String namespace, @Body @Valid KafkaStream strea
*
* @param namespace The namespace
* @param stream The Kafka Streams
* @param dryrun Is dry run mode or not ?
* @param dryrun Is dry run mode or not?
* @return An HTTP response
* @deprecated use {@link #bulkDelete(String, String, boolean)} instead.
*/
@Status(HttpStatus.NO_CONTENT)
@Delete("/{stream}{?dryrun}")
@Deprecated(since = "1.13.0")
HttpResponse<Void> delete(String namespace, String stream, @QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);
if (!streamService.isNamespaceOwnerOfKafkaStream(ns, stream)) {
Expand Down Expand Up @@ -144,4 +146,51 @@ HttpResponse<Void> delete(String namespace, String stream, @QueryValue(defaultVa
streamService.delete(ns, optionalStream.get());
return HttpResponse.noContent();
}

/**
* Delete a Kafka Streams.
*
* @param namespace The namespace
* @param name The name parameter
* @param dryrun Is dry run mode or not?
* @return An HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Delete
HttpResponse<Void> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

List<KafkaStream> kafkaStreams = streamService.findByWildcardName(ns, name);

List<String> validationErrors = kafkaStreams.stream()
.filter(kafkaStream ->
!streamService.isNamespaceOwnerOfKafkaStream(ns, kafkaStream.getMetadata().getName()))
.map(kafkaStream -> invalidOwner(kafkaStream.getMetadata().getName()))
.toList();

if (!validationErrors.isEmpty()) {
throw new ResourceValidationException(KAFKA_STREAM, name, validationErrors);
}

if (kafkaStreams.isEmpty()) {
return HttpResponse.notFound();
}

if (dryrun) {
return HttpResponse.noContent();
}
kafkaStreams.forEach(kafkaStream -> {
sendEventLog(
kafkaStream,
ApplyStatus.deleted,
kafkaStream.getMetadata(),
null,
EMPTY_STRING
);
streamService.delete(ns, kafkaStream);
});

return HttpResponse.noContent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ void shouldNotCreateStreamsWhenValidationErrors() {
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteStreams() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand Down Expand Up @@ -359,6 +360,7 @@ void shouldDeleteStreams() {
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteStreamsInDryRunMode() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand Down Expand Up @@ -388,6 +390,7 @@ void shouldDeleteStreamsInDryRunMode() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteStreamsWhenNotFound() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand All @@ -411,6 +414,7 @@ void shouldNotDeleteStreamsWhenNotFound() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteStreamsWhenNotOwner() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
Expand All @@ -428,4 +432,143 @@ void shouldNotDeleteStreamsWhenNotOwner() {
assertThrows(ResourceValidationException.class, () -> streamController.delete("test", "test_stream1", false));
verify(streamService, never()).delete(any(), any());
}

@Test
void shouldDeleteMultipleStreams() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

KafkaStream stream1 = KafkaStream.builder()
.metadata(Metadata.builder()
.name("test_stream1")
.build())
.build();

KafkaStream stream2 = KafkaStream.builder()
.metadata(Metadata.builder()
.name("test_stream2")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));

when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1"))
.thenReturn(true);

when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream2"))
.thenReturn(true);

when(streamService.findByWildcardName(ns, "test_stream*"))
.thenReturn(List.of(stream1, stream2));

when(securityService.username()).thenReturn(Optional.of("test-user"));
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false);
doNothing().when(applicationEventPublisher).publishEvent(any());
doNothing().when(streamService).delete(ns, stream1);
doNothing().when(streamService).delete(ns, stream2);
var response = streamController.bulkDelete("test", "test_stream*", false);
assertEquals(HttpStatus.NO_CONTENT, response.getStatus());
}

@Test
void shouldDeleteMultipleStreamsInDryRunMode() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

KafkaStream stream1 = KafkaStream.builder()
.metadata(Metadata.builder()
.name("test_stream1")
.build())
.build();

KafkaStream stream2 = KafkaStream.builder()
.metadata(Metadata.builder()
.name("test_stream2")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));

when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1"))
.thenReturn(true);

when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream2"))
.thenReturn(true);

when(streamService.findByWildcardName(ns, "test_stream*"))
.thenReturn(List.of(stream1, stream2));

var response = streamController.bulkDelete("test", "test_stream*", true);
verify(streamService, never()).delete(any(), any());
assertEquals(HttpStatus.NO_CONTENT, response.getStatus());
}

@Test
void shouldNotDeleteMultipleStreamsWhenNotFound() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));

when(streamService.findByWildcardName(ns, "test_stream*"))
.thenReturn(List.of());

var response = streamController.bulkDelete("test", "test_stream*", false);
verify(streamService, never()).delete(any(), any());

assertEquals(HttpStatus.NOT_FOUND, response.getStatus());
}

@Test
void shouldNotDeleteMultipleStreamsWhenNotOwner() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

KafkaStream stream1 = KafkaStream.builder()
.metadata(Metadata.builder()
.name("test_stream1")
.build())
.build();

KafkaStream stream2 = KafkaStream.builder()
.metadata(Metadata.builder()
.name("test_stream2")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));

when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1"))
.thenReturn(true);

when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream2"))
.thenReturn(false);

when(streamService.findByWildcardName(ns, "test_stream*"))
.thenReturn(List.of(stream1, stream2));

assertThrows(ResourceValidationException.class, () ->
streamController.bulkDelete("test", "test_stream*", false));
verify(streamService, never()).delete(any(), any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.michelin.ns4kafka.model.Namespace.NamespaceSpec;
import com.michelin.ns4kafka.service.executor.AccessControlEntryAsyncExecutor;
import com.michelin.ns4kafka.validation.TopicValidator;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
Expand Down Expand Up @@ -163,5 +164,19 @@ void shouldVerifyCreationOfAcls() throws InterruptedException, ExecutionExceptio
assertEquals(1, aclTransactionalId.size());
assertTrue(aclTransactionalId.stream().findFirst().isPresent());
assertEquals(AclOperation.WRITE, aclTransactionalId.stream().findFirst().get().entry().operation());

ns4KafkaClient
.toBlocking()
.exchange(HttpRequest
.create(HttpMethod.DELETE, "/api/namespaces/nskafkastream/streams?name=kstream*")
.bearerAuth(token));

HttpResponse<List<KafkaStream>> streams = ns4KafkaClient
.toBlocking()
.exchange(HttpRequest
.create(HttpMethod.GET, "/api/namespaces/nskafkastream/streams")
.bearerAuth(token), Argument.listOf(KafkaStream.class));

assertEquals(0, streams.getBody().get().size());
}
}

0 comments on commit c50b6fd

Please sign in to comment.