Skip to content

Commit

Permalink
Handle wildcard parameter in Streams list API (#411)
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasCAI-mlv authored Jul 31, 2024
1 parent 9ff07e2 commit 58eff83
Show file tree
Hide file tree
Showing 6 changed files with 456 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ public class StreamController extends NamespacedResourceController {
StreamService streamService;

/**
* List Kafka Streams by namespace.
* List Kafka Streams by namespace, filtered by name parameter.
*
* @param namespace The namespace
* @param name The name parameter
* @return A list of Kafka Streams
*/
@Get("/")
List<KafkaStream> list(String namespace) {
return streamService.findAllForNamespace(getNamespace(namespace));
@Get
List<KafkaStream> list(String namespace, @QueryValue(defaultValue = "*") String name) {
return streamService.findByWildcardName(getNamespace(namespace), name);
}

/**
Expand All @@ -52,8 +53,10 @@ List<KafkaStream> list(String namespace) {
* @param namespace The name
* @param stream The Kafka Streams name
* @return The Kafka Streams
* @deprecated Use ${@link #list(String, String)}
*/
@Get("/{stream}")
@Deprecated(since = "1.12.0")
Optional<KafkaStream> get(String namespace, String stream) {
return streamService.findByName(getNamespace(namespace), stream);
}
Expand Down
18 changes: 17 additions & 1 deletion src/main/java/com/michelin/ns4kafka/service/StreamService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.michelin.ns4kafka.model.Namespace;
import com.michelin.ns4kafka.repository.StreamRepository;
import com.michelin.ns4kafka.service.executor.AccessControlEntryAsyncExecutor;
import com.michelin.ns4kafka.util.RegexUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Inject;
Expand All @@ -28,7 +29,7 @@ public class StreamService {
ApplicationContext applicationContext;

/**
* Find all Kafka Streams by given namespace.
* Find all Kafka Streams of a given namespace.
*
* @param namespace The namespace
* @return A list of Kafka Streams
Expand All @@ -39,6 +40,21 @@ public List<KafkaStream> findAllForNamespace(Namespace namespace) {
.toList();
}

/**
* Find all Kafka Streams of a given namespace, filtered by name parameter.
*
* @param namespace The namespace
* @param name The name filter
* @return A list of Kafka Streams
*/
public List<KafkaStream> findByWildcardName(Namespace namespace, String name) {
List<String> nameFilterPatterns = RegexUtils.wildcardStringsToRegexPatterns(List.of(name));
return findAllForNamespace(namespace)
.stream()
.filter(stream -> RegexUtils.filterByPattern(stream.getMetadata().getName(), nameFilterPatterns))
.toList();
}

/**
* Find a Kafka Streams by namespace and name.
*
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/com/michelin/ns4kafka/util/RegexUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.michelin.ns4kafka.util;

import java.util.List;
import java.util.regex.Pattern;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

/**
* Regex utils.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class RegexUtils {
/**
* Convert wildcard strings list to regex patterns list.
*
* @param wildcardStrings The wildcard strings
* @return A list of regex patterns
*/
public static List<String> wildcardStringsToRegexPatterns(List<String> wildcardStrings) {
return wildcardStrings.stream()
.map(wildcardString -> "^" + wildcardString
.replace(".", "\\.")
.replace("*", ".*")
.replace("?", ".")
.replaceAll("^$", ".*") + "$")
.toList();
}

/**
* Check if a string matches any pattern of a given list.
*
* @param resourceName The string
* @param regexPatterns The regex patterns
* @return true if any regex pattern matches the resourceName, false otherwise
*/
public static boolean filterByPattern(String resourceName, List<String> regexPatterns) {
return regexPatterns.stream()
.anyMatch(pattern -> Pattern.compile(pattern).matcher(resourceName).matches());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,25 @@ class StreamControllerTest {
StreamController streamController;

@Test
void listEmptyStreams() {
void shouldListStreamsWhenEmpty() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(streamService.findAllForNamespace(ns))
when(streamService.findByWildcardName(ns, "*"))
.thenReturn(List.of());

List<KafkaStream> actual = streamController.list("test");
List<KafkaStream> actual = streamController.list("test", "*");
assertEquals(0, actual.size());
}

@Test
void listStreams() {
void shouldListStreamsWithWildcardParameter() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
Expand All @@ -85,17 +86,77 @@ void listStreams() {

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(streamService.findAllForNamespace(ns))
when(streamService.findByWildcardName(ns, "*"))
.thenReturn(List.of(stream1, stream2));

List<KafkaStream> actual = streamController.list("test");
List<KafkaStream> actual = streamController.list("test", "*");
assertEquals(2, actual.size());
assertTrue(actual.contains(stream1));
assertTrue(actual.contains(stream2));
}

@Test
void getEmpty() {
void shouldListStreamsWithNamedParameter() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

KafkaStream stream1 = KafkaStream.builder()
.metadata(Metadata.builder()
.name("prefix.s1")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(streamService.findByWildcardName(ns, "prefix.s1"))
.thenReturn(List.of(stream1));

List<KafkaStream> actual = streamController.list("test", "prefix.s1");

assertEquals(1, actual.size());
assertEquals("prefix.s1", actual.getFirst().getMetadata().getName());
}

@Test
void shouldListStreamsWithEmptyNameParameter() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

KafkaStream stream1 = KafkaStream.builder()
.metadata(Metadata.builder()
.name("prefix.s1")
.build())
.build();

KafkaStream stream2 = KafkaStream.builder()
.metadata(Metadata.builder()
.name("prefix.s2")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(streamService.findByWildcardName(ns, ""))
.thenReturn(List.of(stream1, stream2));

List<KafkaStream> actual = streamController.list("test", "");

assertEquals(2, actual.size());
assertEquals("prefix.s1", actual.get(0).getMetadata().getName());
assertEquals("prefix.s2", actual.get(1).getMetadata().getName());
}

@Test
@SuppressWarnings("deprecation")
void shouldGetStreamsWhenEmpty() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
Expand All @@ -114,7 +175,8 @@ void getEmpty() {
}

@Test
void getStreamFound() {
@SuppressWarnings("deprecation")
void shouldGetStreams() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
Expand All @@ -140,7 +202,7 @@ void getStreamFound() {
}

@Test
void createStreamSuccess() {
void shouldCreateStreams() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
Expand All @@ -162,8 +224,10 @@ void createStreamSuccess() {

when(streamService.findByName(ns, "test_stream1"))
.thenReturn(Optional.empty());
when(securityService.username()).thenReturn(Optional.of("test-user"));
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false);
when(securityService.username())
.thenReturn(Optional.of("test-user"));
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN))
.thenReturn(false);
doNothing().when(applicationEventPublisher).publishEvent(any());

when(streamService.create(stream1))
Expand All @@ -176,7 +240,7 @@ void createStreamSuccess() {
}

@Test
void createStreamSuccessDryRun() {
void shouldCreateStreamsInDryRunMode() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
Expand Down Expand Up @@ -207,7 +271,7 @@ void createStreamSuccessDryRun() {
}

@Test
void updateStreamUnchanged() {
void shouldUpdateStreamsUnchanged() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
Expand Down Expand Up @@ -238,7 +302,7 @@ void updateStreamUnchanged() {
}

@Test
void createStreamValidationError() {
void shouldNotCreateStreamsWhenValidationErrors() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
Expand All @@ -263,15 +327,15 @@ void createStreamValidationError() {
}

@Test
void deleteStreamSuccess() {
void shouldDeleteStreams() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

KafkaStream stream1 = KafkaStream.builder()
KafkaStream stream = KafkaStream.builder()
.metadata(Metadata.builder()
.name("test_stream1")
.build())
Expand All @@ -284,18 +348,18 @@ void deleteStreamSuccess() {
.thenReturn(true);

when(streamService.findByName(ns, "test_stream1"))
.thenReturn(Optional.of(stream1));
.thenReturn(Optional.of(stream));

when(securityService.username()).thenReturn(Optional.of("test-user"));
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false);
doNothing().when(applicationEventPublisher).publishEvent(any());
doNothing().when(streamService).delete(ns, stream1);
doNothing().when(streamService).delete(ns, stream);
var response = streamController.delete("test", "test_stream1", false);
assertEquals(HttpStatus.NO_CONTENT, response.getStatus());
}

@Test
void deleteStreamSuccessDryRun() {
void shouldDeleteStreamsInDryRunMode() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
Expand Down Expand Up @@ -324,7 +388,7 @@ void deleteStreamSuccessDryRun() {
}

@Test
void deleteStreamNotFound() {
void shouldNotDeleteStreamsWhenNotFound() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
Expand All @@ -347,7 +411,7 @@ void deleteStreamNotFound() {
}

@Test
void deleteStreamNotOwner() {
void shouldNotDeleteStreamsWhenNotOwner() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
Expand All @@ -364,4 +428,4 @@ void deleteStreamNotOwner() {
assertThrows(ResourceValidationException.class, () -> streamController.delete("test", "test_stream1", false));
verify(streamService, never()).delete(any(), any());
}
}
}
Loading

0 comments on commit 58eff83

Please sign in to comment.