From b71928b4196294fe2349a00ac83d1ca8ca47a178 Mon Sep 17 00:00:00 2001 From: E046899 Date: Mon, 4 Dec 2023 10:59:59 +0100 Subject: [PATCH 01/15] Improve tags managment --- .../ns4kafka/services/TopicService.java | 31 ++++++++++++++++--- .../clients/schema/SchemaRegistryClient.java | 21 +++++++++++-- 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/services/TopicService.java b/src/main/java/com/michelin/ns4kafka/services/TopicService.java index fb8479a5..1dcb9343 100644 --- a/src/main/java/com/michelin/ns4kafka/services/TopicService.java +++ b/src/main/java/com/michelin/ns4kafka/services/TopicService.java @@ -48,6 +48,8 @@ public class TopicService { @Inject SchemaRegistryClient schemaRegistryClient; + public static final String DYNAMIC_TAGS_CREATION = "dynamic.tags.creation"; + /** * Find all topics. * @@ -351,6 +353,30 @@ public List validateTags(Namespace namespace, Topic topic) { Set tagNames = schemaRegistryClient.getTags(namespace.getMetadata().getCluster()) .map(tags -> tags.stream().map(TagInfo::name).collect(Collectors.toSet())).block(); + List unavailableTagNames = topic.getSpec().getTags() + .stream() + .filter(tagName -> tagNames != null && !tagNames.contains(tagName)) + .toList(); + + if(topicCluster.isPresent() && + Boolean.parseBoolean( + topicCluster.get().getConfig().getProperty( + DYNAMIC_TAGS_CREATION, + "true"))) { + + List tagsToCreate = unavailableTagNames + .stream() + .map(TagInfo::new) + .collect(Collectors.toList()); + + schemaRegistryClient.createTags( + tagsToCreate, + namespace.getMetadata().getCluster()) + .block(); + + return validationErrors; + } + if (tagNames == null || tagNames.isEmpty()) { validationErrors.add(String.format( "Invalid value %s for tags: No tags allowed.", @@ -358,11 +384,6 @@ public List validateTags(Namespace namespace, Topic topic) { return validationErrors; } - List unavailableTagNames = topic.getSpec().getTags() - .stream() - .filter(tagName -> !tagNames.contains(tagName)) - .toList(); - if (!unavailableTagNames.isEmpty()) { validationErrors.add(String.format( "Invalid value %s for tags: Available tags are %s.", diff --git a/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java b/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java index dcb2c7a5..87d6132e 100644 --- a/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java +++ b/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java @@ -186,9 +186,9 @@ public Mono deleteCurrentCompatibilityBySubject(Str public Mono> getTags(String kafkaCluster) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); HttpRequest request = HttpRequest - .GET(URI.create(StringUtils.prependUri( - config.getUrl(), "/catalog/v1/types/tagdefs"))) - .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + .GET(URI.create(StringUtils.prependUri( + config.getUrl(), "/catalog/v1/types/tagdefs"))) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class))); } @@ -225,6 +225,21 @@ public Mono> addTags(String kafkaCluster, List return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class))); } + /** + * Create tags. + * + * @param tags The list of tags to create + * @param kafkaCluster The Kafka cluster + * @return Information about created tags + */ + public Mono> createTags(List tags, String kafkaCluster) { + ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); + HttpRequest request = HttpRequest.POST(URI.create(StringUtils.prependUri( + config.getUrl(), "/catalog/v1/types/tagdefs")), tags) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class))); + } + /** * Delete a tag to a topic. * From 46bb2f1807328428a731276b56fa296fd5e51d68 Mon Sep 17 00:00:00 2001 From: E046899 Date: Mon, 4 Dec 2023 11:08:08 +0100 Subject: [PATCH 02/15] Improve tags managment --- README.md | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 42e2dc32..76f6e4a8 100644 --- a/README.md +++ b/README.md @@ -16,22 +16,24 @@ using [Kafkactl](https://github.com/michelin/kafkactl), which follows best pract ## Table of Contents -* [Principles](#principles) - * [Namespace Isolation](#namespace-isolation) - * [Desired State](#desired-state) - * [Server Side Validation](#server-side-validation) - * [CLI](#cli) -* [Download](#download) -* [Install](#install) -* [Demo Environment](#demo-environment) -* [Configuration](#configuration) - * [GitLab Authentication](#gitlab-authentication) - * [Admin Account](#admin-account) - * [Kafka Broker Authentication](#kafka-broker-authentication) - * [Managed clusters](#managed-clusters) - * [AKHQ](#akhq) -* [Administration](#administration) -* [Contribution](#contribution) +- [Ns4Kafka](#ns4kafka) + - [Table of Contents](#table-of-contents) + - [Principles](#principles) + - [Namespace Isolation](#namespace-isolation) + - [Desired State](#desired-state) + - [Server Side Validation](#server-side-validation) + - [CLI](#cli) + - [Download](#download) + - [Install](#install) + - [Demo Environment](#demo-environment) + - [Configuration](#configuration) + - [GitLab Authentication](#gitlab-authentication) + - [Admin Account](#admin-account) + - [Kafka Broker Authentication](#kafka-broker-authentication) + - [Managed clusters](#managed-clusters) + - [AKHQ](#akhq) + - [Administration](#administration) + - [Contribution](#contribution) ## Principles @@ -213,6 +215,7 @@ of your namespace descriptors. | provider | boolean | The kind of cluster. Either SELF_MANAGED or CONFLUENT_CLOUD | | config.bootstrap.servers | string | The location of the clusters servers | | config.cluster.id | string | The cluster id. Required to use [Confluent Cloud tags](https://docs.confluent.io/cloud/current/stream-governance/stream-catalog.html). | +| dynamic.tags.creation | boolean | By default it's true so tags are created when linked to a topic. Otherwise, tags linked to topics must belong to the existing tag list in Confluent. | | schema-registry.url | string | The location of the Schema Registry | | schema-registry.basicAuthUsername | string | Basic authentication username to the Schema Registry | | schema-registry.basicAuthPassword | string | Basic authentication password to the Schema Registry | From df7c18074d2c3b77e25fa335816328608b1506ef Mon Sep 17 00:00:00 2001 From: E046899 Date: Mon, 4 Dec 2023 11:35:21 +0100 Subject: [PATCH 03/15] Improve tags managment --- .../java/com/michelin/ns4kafka/services/TopicService.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/services/TopicService.java b/src/main/java/com/michelin/ns4kafka/services/TopicService.java index 1dcb9343..fee0d95a 100644 --- a/src/main/java/com/michelin/ns4kafka/services/TopicService.java +++ b/src/main/java/com/michelin/ns4kafka/services/TopicService.java @@ -358,12 +358,11 @@ public List validateTags(Namespace namespace, Topic topic) { .filter(tagName -> tagNames != null && !tagNames.contains(tagName)) .toList(); - if(topicCluster.isPresent() && - Boolean.parseBoolean( + if (topicCluster.isPresent() + && Boolean.parseBoolean( topicCluster.get().getConfig().getProperty( DYNAMIC_TAGS_CREATION, "true"))) { - List tagsToCreate = unavailableTagNames .stream() .map(TagInfo::new) From 004ee42991560d831acbe53cdcfcb2e229c1ebfc Mon Sep 17 00:00:00 2001 From: E046899 Date: Mon, 4 Dec 2023 15:04:44 +0100 Subject: [PATCH 04/15] Improve tags managment --- .../ns4kafka/services/TopicServiceTest.java | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java b/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java index fa277b11..62822365 100644 --- a/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.stream.Stream; @@ -900,9 +901,14 @@ void shouldTagsBeValid() { .build(); List tagInfo = List.of(TagInfo.builder().name("TAG_TEST").build()); + ManagedClusterProperties managedClusterProps = + new ManagedClusterProperties("local", + ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); + Properties properties = new Properties(); + properties.put(TopicService.DYNAMIC_TAGS_CREATION, "false"); + managedClusterProps.setConfig(properties); - when(managedClusterProperties.stream()).thenReturn(Stream.of( - new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD))); + when(managedClusterProperties.stream()).thenReturn(Stream.of(managedClusterProps)); when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(tagInfo)); List validationErrors = topicService.validateTags(ns, topic); @@ -947,8 +953,14 @@ void shouldTagsBeInvalidWhenNoTagsAllowed() { .tags(List.of("TAG_TEST")).build()) .build(); - when(managedClusterProperties.stream()).thenReturn(Stream.of( - new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD))); + ManagedClusterProperties managedClusterProps = + new ManagedClusterProperties("local", + ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); + Properties properties = new Properties(); + properties.put(TopicService.DYNAMIC_TAGS_CREATION, "false"); + managedClusterProps.setConfig(properties); + + when(managedClusterProperties.stream()).thenReturn(Stream.of(managedClusterProps)); when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(Collections.emptyList())); List validationErrors = topicService.validateTags(ns, topic); @@ -974,11 +986,14 @@ void shouldTagsBeInvalidWhenNotAllowed() { .build(); List tagInfo = List.of(TagInfo.builder().name("TAG_TEST").build()); - - when(managedClusterProperties.stream()) - .thenReturn(Stream.of( + ManagedClusterProperties managedClusterProps = new ManagedClusterProperties("local", - ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD))); + ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); + Properties properties = new Properties(); + properties.put(TopicService.DYNAMIC_TAGS_CREATION, "false"); + managedClusterProps.setConfig(properties); + + when(managedClusterProperties.stream()).thenReturn(Stream.of(managedClusterProps)); when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(tagInfo)); List validationErrors = topicService.validateTags(ns, topic); From 5d00725a68fa1cf997ef5476c08e78b2f0ac27bd Mon Sep 17 00:00:00 2001 From: E046899 Date: Mon, 4 Dec 2023 17:03:48 +0100 Subject: [PATCH 05/15] improve tags managment --- .../ns4kafka/services/TopicServiceTest.java | 75 +++++++------------ 1 file changed, 27 insertions(+), 48 deletions(-) diff --git a/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java b/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java index 62822365..cef13fda 100644 --- a/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java @@ -887,20 +887,6 @@ void findAll() { @Test void shouldTagsBeValid() { - Namespace ns = Namespace.builder() - .metadata(ObjectMeta.builder() - .name("namespace") - .cluster("local") - .build()) - .build(); - - Topic topic = Topic.builder() - .metadata(ObjectMeta.builder().name("ns-topic1").build()) - .spec(Topic.TopicSpec.builder() - .tags(List.of("TAG_TEST")).build()) - .build(); - - List tagInfo = List.of(TagInfo.builder().name("TAG_TEST").build()); ManagedClusterProperties managedClusterProps = new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); @@ -909,9 +895,15 @@ void shouldTagsBeValid() { managedClusterProps.setConfig(properties); when(managedClusterProperties.stream()).thenReturn(Stream.of(managedClusterProps)); - when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(tagInfo)); - - List validationErrors = topicService.validateTags(ns, topic); + when(schemaRegistryClient.getTags("local")) + .thenReturn(Mono.just(List.of(TagInfo.builder().name("TAG_TEST").build()))); + + List validationErrors = topicService.validateTags( + Namespace.builder().metadata( + ObjectMeta.builder().name("namespace").cluster("local").build()).build(), + Topic.builder().metadata( + ObjectMeta.builder().name("ns-topic1").build()).spec(Topic.TopicSpec.builder() + .tags(List.of("TAG_TEST")).build()).build()); assertEquals(0, validationErrors.size()); } @@ -940,19 +932,6 @@ void shouldTagsBeInvalidWhenNotConfluentCloud() { @Test void shouldTagsBeInvalidWhenNoTagsAllowed() { - Namespace ns = Namespace.builder() - .metadata(ObjectMeta.builder() - .name("namespace") - .cluster("local") - .build()) - .build(); - - Topic topic = Topic.builder() - .metadata(ObjectMeta.builder().name("ns-topic1").build()) - .spec(Topic.TopicSpec.builder() - .tags(List.of("TAG_TEST")).build()) - .build(); - ManagedClusterProperties managedClusterProps = new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); @@ -963,7 +942,14 @@ void shouldTagsBeInvalidWhenNoTagsAllowed() { when(managedClusterProperties.stream()).thenReturn(Stream.of(managedClusterProps)); when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(Collections.emptyList())); - List validationErrors = topicService.validateTags(ns, topic); + List validationErrors = topicService.validateTags( + Namespace.builder().metadata(ObjectMeta.builder() + .name("namespace").cluster("local").build()).build(), + Topic.builder() + .metadata(ObjectMeta.builder().name("ns-topic1").build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of("TAG_TEST")).build()) + .build()); assertEquals(1, validationErrors.size()); assertEquals( "Invalid value TAG_TEST for tags: No tags allowed.", @@ -972,20 +958,6 @@ void shouldTagsBeInvalidWhenNoTagsAllowed() { @Test void shouldTagsBeInvalidWhenNotAllowed() { - Namespace ns = Namespace.builder() - .metadata(ObjectMeta.builder() - .name("namespace") - .cluster("local") - .build()) - .build(); - - Topic topic = Topic.builder() - .metadata(ObjectMeta.builder().name("ns-topic1").build()) - .spec(Topic.TopicSpec.builder() - .tags(List.of("BAD_TAG", "TAG_TEST")).build()) - .build(); - - List tagInfo = List.of(TagInfo.builder().name("TAG_TEST").build()); ManagedClusterProperties managedClusterProps = new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); @@ -994,9 +966,16 @@ void shouldTagsBeInvalidWhenNotAllowed() { managedClusterProps.setConfig(properties); when(managedClusterProperties.stream()).thenReturn(Stream.of(managedClusterProps)); - when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(tagInfo)); - - List validationErrors = topicService.validateTags(ns, topic); + when(schemaRegistryClient.getTags("local")) + .thenReturn(Mono.just(List.of(TagInfo.builder().name("TAG_TEST").build()))); + + List validationErrors = topicService.validateTags( + Namespace.builder().metadata( + ObjectMeta.builder().name("namespace").cluster("local").build()).build(), + Topic.builder() + .metadata(ObjectMeta.builder().name("ns-topic1").build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of("BAD_TAG", "TAG_TEST")).build()).build()); assertEquals(1, validationErrors.size()); assertEquals("Invalid value BAD_TAG for tags: Available tags are TAG_TEST.", validationErrors.get(0)); } From 287ca0e2468ec9d93a868b962b2c12b1dc1b1341 Mon Sep 17 00:00:00 2001 From: E046899 Date: Mon, 4 Dec 2023 17:17:42 +0100 Subject: [PATCH 06/15] improve tags managment --- .../ns4kafka/services/TopicServiceTest.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java b/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java index cef13fda..8a9c0d36 100644 --- a/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java @@ -4,6 +4,8 @@ import static org.junit.jupiter.api.Assertions.assertLinesMatch; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.when; import com.michelin.ns4kafka.models.AccessControlEntry; @@ -907,6 +909,30 @@ void shouldTagsBeValid() { assertEquals(0, validationErrors.size()); } + @Test + void shouldTagsBeCreated() { + ManagedClusterProperties managedClusterProps = + new ManagedClusterProperties("local", + ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); + Properties properties = new Properties(); + properties.put(TopicService.DYNAMIC_TAGS_CREATION, "true"); + managedClusterProps.setConfig(properties); + + when(managedClusterProperties.stream()).thenReturn(Stream.of(managedClusterProps)); + when(schemaRegistryClient.getTags("local")) + .thenReturn(Mono.just(List.of(TagInfo.builder().name("TAG_TEST").build()))); + when(schemaRegistryClient.createTags(anyList(), anyString())) + .thenReturn(Mono.just(List.of(TagInfo.builder().name("TAG_TEST").build()))); + + List validationErrors = topicService.validateTags( + Namespace.builder().metadata( + ObjectMeta.builder().name("namespace").cluster("local").build()).build(), + Topic.builder().metadata( + ObjectMeta.builder().name("ns-topic1").build()).spec(Topic.TopicSpec.builder() + .tags(List.of("TAG_TEST")).build()).build()); + assertEquals(0, validationErrors.size()); + } + @Test void shouldTagsBeInvalidWhenNotConfluentCloud() { Namespace ns = Namespace.builder() From 893fe19c45731905d537daf125394434c8fd02cd Mon Sep 17 00:00:00 2001 From: E046899 Date: Mon, 4 Dec 2023 17:22:01 +0100 Subject: [PATCH 07/15] improve tags managment --- src/main/java/com/michelin/ns4kafka/services/TopicService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/michelin/ns4kafka/services/TopicService.java b/src/main/java/com/michelin/ns4kafka/services/TopicService.java index fee0d95a..f7b74f0e 100644 --- a/src/main/java/com/michelin/ns4kafka/services/TopicService.java +++ b/src/main/java/com/michelin/ns4kafka/services/TopicService.java @@ -366,7 +366,7 @@ public List validateTags(Namespace namespace, Topic topic) { List tagsToCreate = unavailableTagNames .stream() .map(TagInfo::new) - .collect(Collectors.toList()); + .toList(); schemaRegistryClient.createTags( tagsToCreate, From 1b81699fe11554134d6538eb0d6ec94bf2d4d374 Mon Sep 17 00:00:00 2001 From: E046899 Date: Thu, 7 Dec 2023 14:12:56 +0100 Subject: [PATCH 08/15] manage TAG in upper case only --- .../com/michelin/ns4kafka/controllers/topic/TopicController.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/michelin/ns4kafka/controllers/topic/TopicController.java b/src/main/java/com/michelin/ns4kafka/controllers/topic/TopicController.java index 89d63d66..e529fe4f 100644 --- a/src/main/java/com/michelin/ns4kafka/controllers/topic/TopicController.java +++ b/src/main/java/com/michelin/ns4kafka/controllers/topic/TopicController.java @@ -108,6 +108,7 @@ public HttpResponse apply(String namespace, @Valid @Body Topic topic, validationErrors.addAll(topicService.validateTopicUpdate(ns, existingTopic.get(), topic)); } + topic.getSpec().getTags().replaceAll(String::toUpperCase); List existingTags = existingTopic .map(oldTopic -> oldTopic.getSpec().getTags()) .orElse(Collections.emptyList()); From 0f17f5a365a41a269926193fbb66bcf1cd839993 Mon Sep 17 00:00:00 2001 From: E046899 Date: Thu, 7 Dec 2023 15:24:06 +0100 Subject: [PATCH 09/15] fix tu --- .../com/michelin/ns4kafka/controllers/TopicControllerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java b/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java index 3854ce35..4647e989 100644 --- a/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java @@ -434,7 +434,7 @@ void shouldNotValidateTagsWhenNoNewTag() throws InterruptedException, ExecutionE .name("test.topic") .build()) .spec(Topic.TopicSpec.builder() - .tags(List.of("TAG1")) + .tags(Arrays.asList("TAG1")) .replicationFactor(3) .partitions(3) .configs(Map.of("cleanup.policy", "delete", From 29beac56551c16de5069a56a7bdf7eec98f669ac Mon Sep 17 00:00:00 2001 From: E046899 Date: Thu, 14 Dec 2023 17:26:18 +0100 Subject: [PATCH 10/15] better managment of tags --- .../ns4kafka/services/TopicService.java | 40 ------------- .../clients/schema/SchemaRegistryClient.java | 4 +- .../executors/TopicAsyncExecutor.java | 57 ++++++++++++------- .../executors/TopicAsyncExecutorTest.java | 18 +++--- 4 files changed, 47 insertions(+), 72 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/services/TopicService.java b/src/main/java/com/michelin/ns4kafka/services/TopicService.java index f7b74f0e..16f01469 100644 --- a/src/main/java/com/michelin/ns4kafka/services/TopicService.java +++ b/src/main/java/com/michelin/ns4kafka/services/TopicService.java @@ -350,46 +350,6 @@ public List validateTags(Namespace namespace, Topic topic) { return validationErrors; } - Set tagNames = schemaRegistryClient.getTags(namespace.getMetadata().getCluster()) - .map(tags -> tags.stream().map(TagInfo::name).collect(Collectors.toSet())).block(); - - List unavailableTagNames = topic.getSpec().getTags() - .stream() - .filter(tagName -> tagNames != null && !tagNames.contains(tagName)) - .toList(); - - if (topicCluster.isPresent() - && Boolean.parseBoolean( - topicCluster.get().getConfig().getProperty( - DYNAMIC_TAGS_CREATION, - "true"))) { - List tagsToCreate = unavailableTagNames - .stream() - .map(TagInfo::new) - .toList(); - - schemaRegistryClient.createTags( - tagsToCreate, - namespace.getMetadata().getCluster()) - .block(); - - return validationErrors; - } - - if (tagNames == null || tagNames.isEmpty()) { - validationErrors.add(String.format( - "Invalid value %s for tags: No tags allowed.", - String.join(", ", topic.getSpec().getTags()))); - return validationErrors; - } - - if (!unavailableTagNames.isEmpty()) { - validationErrors.add(String.format( - "Invalid value %s for tags: Available tags are %s.", - String.join(", ", unavailableTagNames), - String.join(", ", tagNames))); - } - return validationErrors; } } diff --git a/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java b/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java index 87d6132e..114f5ff3 100644 --- a/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java +++ b/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java @@ -215,7 +215,7 @@ public Mono getTopicWithTags(String kafkaCluster) { * @param tagSpecs Tags to add * @return Information about added tags */ - public Mono> addTags(String kafkaCluster, List tagSpecs) { + public Mono> associateTags(String kafkaCluster, List tagSpecs) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); HttpRequest request = HttpRequest .POST(URI.create(StringUtils.prependUri( @@ -248,7 +248,7 @@ public Mono> createTags(List tags, String kafkaCluster) { * @param tagName The tag to delete * @return The resume response */ - public Mono> deleteTag(String kafkaCluster, String entityName, String tagName) { + public Mono> dissociateTag(String kafkaCluster, String entityName, String tagName) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); HttpRequest request = HttpRequest .DELETE(URI.create(StringUtils.prependUri( diff --git a/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java index 3a0ba80d..577d0491 100644 --- a/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java @@ -8,6 +8,7 @@ import com.michelin.ns4kafka.services.clients.schema.SchemaRegistryClient; import com.michelin.ns4kafka.services.clients.schema.entities.TagEntities; import com.michelin.ns4kafka.services.clients.schema.entities.TagEntity; +import com.michelin.ns4kafka.services.clients.schema.entities.TagInfo; import com.michelin.ns4kafka.services.clients.schema.entities.TagTopicInfo; import io.micronaut.context.annotation.EachBean; import jakarta.inject.Singleton; @@ -154,7 +155,7 @@ public void alterTags(List ns4kafkaTopics, Map brokerTopic // Get tags to delete Set existingTags = new HashSet<>(brokerTopic.getSpec().getTags()); existingTags.removeAll(Set.copyOf(topic.getSpec().getTags())); - deleteTags(existingTags, topic.getMetadata().getName()); + dissociateTags(existingTags, topic.getMetadata().getName()); // Get tags to create Set newTags = new HashSet<>(topic.getSpec().getTags()); @@ -173,7 +174,7 @@ public void alterTags(List ns4kafkaTopics, Map brokerTopic .toList(); if (!tagsToCreate.isEmpty()) { - createTags(tagsToCreate); + createAndAssociateTags(tagsToCreate); } } @@ -191,7 +192,7 @@ public void deleteTopic(Topic topic) throws InterruptedException, ExecutionExcep managedClusterProperties.getName()); if (isConfluentCloud() && !topic.getSpec().getTags().isEmpty()) { - deleteTags(topic.getSpec().getTags(), topic.getMetadata().getName()); + dissociateTags(topic.getSpec().getTags(), topic.getMetadata().getName()); } } @@ -378,37 +379,51 @@ private void createTopics(List topics) { } /** - * Create tags. + * Create tags and associate them. * - * @param tagsToCreate The tags to create + * @param tagsToAssociate The tags to create and associate */ - private void createTags(List tagsToCreate) { - String stringTags = String.join(", ", tagsToCreate - .stream() - .map(Record::toString) - .toList()); + private void createAndAssociateTags(List tagsToAssociate) { + List tagsToCreate = tagsToAssociate + .stream() + .map(tag -> TagInfo + .builder() + .name(tag.typeName()) + .build()) + .toList(); - schemaRegistryClient.addTags(managedClusterProperties.getName(), tagsToCreate) - .subscribe(success -> log.info(String.format("Success creating tag %s.", stringTags)), - error -> log.error(String.format("Error creating tag %s.", stringTags), error)); + String stringTags = String.join(", ", tagsToAssociate + .stream() + .map(Record::toString) + .toList()); + + schemaRegistryClient.createTags(tagsToCreate, managedClusterProperties.getName()) + .subscribe(successCreation -> { + schemaRegistryClient.associateTags(managedClusterProperties.getName(), tagsToAssociate) + .subscribe( + successAssociation -> + log.info(String.format("Success associating tag %s.", stringTags)), + error -> + log.error(String.format("Error associating tag %s.", stringTags), error)); + }, error -> log.error(String.format("Error creating tag %s.", stringTags), error)); } /** - * Delete tags. + * Dissociate tags to a topic. * - * @param tagsToDelete The tags to delete - * @param topicName The topic name + * @param tagsToDissociate The tags to dissociate + * @param topicName The topic name */ - private void deleteTags(Collection tagsToDelete, String topicName) { - tagsToDelete - .forEach(tag -> schemaRegistryClient.deleteTag(managedClusterProperties.getName(), + private void dissociateTags(Collection tagsToDissociate, String topicName) { + tagsToDissociate + .forEach(tag -> schemaRegistryClient.dissociateTag(managedClusterProperties.getName(), managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + topicName, tag) - .subscribe(success -> log.info(String.format("Success deleting tag %s.", + .subscribe(success -> log.info(String.format("Success dissociating tag %s.", managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + topicName + "/" + tag)), - error -> log.error(String.format("Error deleting tag %s.", + error -> log.error(String.format("Error dissociating tag %s.", managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + topicName + "/" + tag), error))); diff --git a/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java b/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java index aa5e26a2..c044a210 100644 --- a/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java @@ -67,7 +67,7 @@ void shouldDeleteTagsAndNotCreateIfEmpty() { Properties properties = new Properties(); properties.put(CLUSTER_ID, CLUSTER_ID_TEST); - when(schemaRegistryClient.deleteTag(anyString(), + when(schemaRegistryClient.dissociateTag(anyString(), anyString(), anyString())) .thenReturn(Mono.empty()) .thenReturn(Mono.error(new Exception("error"))); @@ -96,8 +96,8 @@ void shouldDeleteTagsAndNotCreateIfEmpty() { topicAsyncExecutor.alterTags(ns4kafkaTopics, brokerTopics); - verify(schemaRegistryClient).deleteTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG2); - verify(schemaRegistryClient).deleteTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG3); + verify(schemaRegistryClient).dissociateTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG2); + verify(schemaRegistryClient).dissociateTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG3); } @Test @@ -105,7 +105,7 @@ void shouldCreateTags() { Properties properties = new Properties(); properties.put(CLUSTER_ID, CLUSTER_ID_TEST); - when(schemaRegistryClient.addTags(anyString(), anyList())) + when(schemaRegistryClient.associateTags(anyString(), anyList())) .thenReturn(Mono.empty()) .thenReturn(Mono.error(new Exception("error"))); when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); @@ -132,7 +132,7 @@ void shouldCreateTags() { topicAsyncExecutor.alterTags(ns4kafkaTopics, brokerTopics); - verify(schemaRegistryClient).addTags(eq(LOCAL_CLUSTER), argThat(tags -> + verify(schemaRegistryClient).associateTags(eq(LOCAL_CLUSTER), argThat(tags -> tags.get(0).entityName().equals(CLUSTER_ID_TEST + ":" + TOPIC_NAME) && tags.get(0).typeName().equals(TAG1) && tags.get(0).entityType().equals(TOPIC_ENTITY_TYPE))); @@ -161,7 +161,7 @@ void shouldDeleteTopicNoTags() throws ExecutionException, InterruptedException, topicAsyncExecutor.deleteTopic(topic); - verify(schemaRegistryClient, never()).deleteTag(any(), any(), any()); + verify(schemaRegistryClient, never()).dissociateTag(any(), any(), any()); } @Test @@ -181,7 +181,7 @@ void shouldDeleteTopicSelfManagedCluster() throws ExecutionException, Interrupte topicAsyncExecutor.deleteTopic(topic); - verify(schemaRegistryClient, never()).deleteTag(any(), any(), any()); + verify(schemaRegistryClient, never()).dissociateTag(any(), any(), any()); } @Test @@ -195,7 +195,7 @@ void shouldDeleteTopicAndTags() throws ExecutionException, InterruptedException, when(managedClusterProperties.getAdminClient()).thenReturn(adminClient); when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); when(managedClusterProperties.getConfig()).thenReturn(properties); - when(schemaRegistryClient.deleteTag(anyString(), + when(schemaRegistryClient.dissociateTag(anyString(), anyString(), anyString())) .thenReturn(Mono.just(HttpResponse.ok())) .thenReturn(Mono.error(new Exception("error"))); @@ -211,7 +211,7 @@ void shouldDeleteTopicAndTags() throws ExecutionException, InterruptedException, topicAsyncExecutor.deleteTopic(topic); - verify(schemaRegistryClient).deleteTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG1); + verify(schemaRegistryClient).dissociateTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG1); } @Test From e8d4b3f6a96d71eb3e3adcb50cd145ea63c3c6ec Mon Sep 17 00:00:00 2001 From: E046899 Date: Thu, 14 Dec 2023 17:39:27 +0100 Subject: [PATCH 11/15] better managment of tags --- README.md | 1 - .../ns4kafka/services/TopicService.java | 5 -- .../ns4kafka/services/TopicServiceTest.java | 62 ------------------- 3 files changed, 68 deletions(-) diff --git a/README.md b/README.md index 76f6e4a8..f65f6c14 100644 --- a/README.md +++ b/README.md @@ -215,7 +215,6 @@ of your namespace descriptors. | provider | boolean | The kind of cluster. Either SELF_MANAGED or CONFLUENT_CLOUD | | config.bootstrap.servers | string | The location of the clusters servers | | config.cluster.id | string | The cluster id. Required to use [Confluent Cloud tags](https://docs.confluent.io/cloud/current/stream-governance/stream-catalog.html). | -| dynamic.tags.creation | boolean | By default it's true so tags are created when linked to a topic. Otherwise, tags linked to topics must belong to the existing tag list in Confluent. | | schema-registry.url | string | The location of the Schema Registry | | schema-registry.basicAuthUsername | string | Basic authentication username to the Schema Registry | | schema-registry.basicAuthPassword | string | Basic authentication password to the Schema Registry | diff --git a/src/main/java/com/michelin/ns4kafka/services/TopicService.java b/src/main/java/com/michelin/ns4kafka/services/TopicService.java index 16f01469..27480da2 100644 --- a/src/main/java/com/michelin/ns4kafka/services/TopicService.java +++ b/src/main/java/com/michelin/ns4kafka/services/TopicService.java @@ -45,11 +45,6 @@ public class TopicService { @Inject List managedClusterProperties; - @Inject - SchemaRegistryClient schemaRegistryClient; - - public static final String DYNAMIC_TAGS_CREATION = "dynamic.tags.creation"; - /** * Find all topics. * diff --git a/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java b/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java index 8a9c0d36..224cf0a5 100644 --- a/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java @@ -4,8 +4,6 @@ import static org.junit.jupiter.api.Assertions.assertLinesMatch; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.when; import com.michelin.ns4kafka.models.AccessControlEntry; @@ -16,7 +14,6 @@ import com.michelin.ns4kafka.properties.ManagedClusterProperties; import com.michelin.ns4kafka.repositories.TopicRepository; import com.michelin.ns4kafka.services.clients.schema.SchemaRegistryClient; -import com.michelin.ns4kafka.services.clients.schema.entities.TagInfo; import com.michelin.ns4kafka.services.executors.TopicAsyncExecutor; import io.micronaut.context.ApplicationContext; import io.micronaut.inject.qualifiers.Qualifiers; @@ -36,7 +33,6 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import reactor.core.publisher.Mono; @ExtendWith(MockitoExtension.class) class TopicServiceTest { @@ -893,12 +889,9 @@ void shouldTagsBeValid() { new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); Properties properties = new Properties(); - properties.put(TopicService.DYNAMIC_TAGS_CREATION, "false"); managedClusterProps.setConfig(properties); when(managedClusterProperties.stream()).thenReturn(Stream.of(managedClusterProps)); - when(schemaRegistryClient.getTags("local")) - .thenReturn(Mono.just(List.of(TagInfo.builder().name("TAG_TEST").build()))); List validationErrors = topicService.validateTags( Namespace.builder().metadata( @@ -915,14 +908,9 @@ void shouldTagsBeCreated() { new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); Properties properties = new Properties(); - properties.put(TopicService.DYNAMIC_TAGS_CREATION, "true"); managedClusterProps.setConfig(properties); when(managedClusterProperties.stream()).thenReturn(Stream.of(managedClusterProps)); - when(schemaRegistryClient.getTags("local")) - .thenReturn(Mono.just(List.of(TagInfo.builder().name("TAG_TEST").build()))); - when(schemaRegistryClient.createTags(anyList(), anyString())) - .thenReturn(Mono.just(List.of(TagInfo.builder().name("TAG_TEST").build()))); List validationErrors = topicService.validateTags( Namespace.builder().metadata( @@ -955,54 +943,4 @@ void shouldTagsBeInvalidWhenNotConfluentCloud() { assertEquals(1, validationErrors.size()); assertEquals("Invalid value TAG_TEST for tags: Tags are not currently supported.", validationErrors.get(0)); } - - @Test - void shouldTagsBeInvalidWhenNoTagsAllowed() { - ManagedClusterProperties managedClusterProps = - new ManagedClusterProperties("local", - ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); - Properties properties = new Properties(); - properties.put(TopicService.DYNAMIC_TAGS_CREATION, "false"); - managedClusterProps.setConfig(properties); - - when(managedClusterProperties.stream()).thenReturn(Stream.of(managedClusterProps)); - when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(Collections.emptyList())); - - List validationErrors = topicService.validateTags( - Namespace.builder().metadata(ObjectMeta.builder() - .name("namespace").cluster("local").build()).build(), - Topic.builder() - .metadata(ObjectMeta.builder().name("ns-topic1").build()) - .spec(Topic.TopicSpec.builder() - .tags(List.of("TAG_TEST")).build()) - .build()); - assertEquals(1, validationErrors.size()); - assertEquals( - "Invalid value TAG_TEST for tags: No tags allowed.", - validationErrors.get(0)); - } - - @Test - void shouldTagsBeInvalidWhenNotAllowed() { - ManagedClusterProperties managedClusterProps = - new ManagedClusterProperties("local", - ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); - Properties properties = new Properties(); - properties.put(TopicService.DYNAMIC_TAGS_CREATION, "false"); - managedClusterProps.setConfig(properties); - - when(managedClusterProperties.stream()).thenReturn(Stream.of(managedClusterProps)); - when(schemaRegistryClient.getTags("local")) - .thenReturn(Mono.just(List.of(TagInfo.builder().name("TAG_TEST").build()))); - - List validationErrors = topicService.validateTags( - Namespace.builder().metadata( - ObjectMeta.builder().name("namespace").cluster("local").build()).build(), - Topic.builder() - .metadata(ObjectMeta.builder().name("ns-topic1").build()) - .spec(Topic.TopicSpec.builder() - .tags(List.of("BAD_TAG", "TAG_TEST")).build()).build()); - assertEquals(1, validationErrors.size()); - assertEquals("Invalid value BAD_TAG for tags: Available tags are TAG_TEST.", validationErrors.get(0)); - } } From 3b3c7ecd6e53febed620c7907e8f8001018b3e2c Mon Sep 17 00:00:00 2001 From: E046899 Date: Thu, 14 Dec 2023 17:58:02 +0100 Subject: [PATCH 12/15] better managment of tags --- .../services/executors/TopicAsyncExecutorTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java b/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java index c044a210..72401356 100644 --- a/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java @@ -106,8 +106,11 @@ void shouldCreateTags() { properties.put(CLUSTER_ID, CLUSTER_ID_TEST); when(schemaRegistryClient.associateTags(anyString(), anyList())) - .thenReturn(Mono.empty()) - .thenReturn(Mono.error(new Exception("error"))); + .thenReturn(Mono.empty()) + .thenReturn(Mono.error(new Exception("error"))); + when(schemaRegistryClient.createTags(anyList(), anyString())) + .thenReturn(Mono.just(List.of())) + .thenReturn(Mono.error(new Exception("error"))); when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); when(managedClusterProperties.getConfig()).thenReturn(properties); From a06ccc3910f2ec734c891fc039ba85249b25d9b2 Mon Sep 17 00:00:00 2001 From: E046899 Date: Fri, 15 Dec 2023 09:29:38 +0100 Subject: [PATCH 13/15] manage feedbacks --- README.md | 34 +++++++------- .../executors/TopicAsyncExecutor.java | 19 ++++---- .../ns4kafka/services/TopicServiceTest.java | 19 -------- .../executors/TopicAsyncExecutorTest.java | 44 +++++++++++++++++-- 4 files changed, 67 insertions(+), 49 deletions(-) diff --git a/README.md b/README.md index f65f6c14..a0834c1d 100644 --- a/README.md +++ b/README.md @@ -16,24 +16,22 @@ using [Kafkactl](https://github.com/michelin/kafkactl), which follows best pract ## Table of Contents -- [Ns4Kafka](#ns4kafka) - - [Table of Contents](#table-of-contents) - - [Principles](#principles) - - [Namespace Isolation](#namespace-isolation) - - [Desired State](#desired-state) - - [Server Side Validation](#server-side-validation) - - [CLI](#cli) - - [Download](#download) - - [Install](#install) - - [Demo Environment](#demo-environment) - - [Configuration](#configuration) - - [GitLab Authentication](#gitlab-authentication) - - [Admin Account](#admin-account) - - [Kafka Broker Authentication](#kafka-broker-authentication) - - [Managed clusters](#managed-clusters) - - [AKHQ](#akhq) - - [Administration](#administration) - - [Contribution](#contribution) +* [Principles](#principles) + * [Namespace Isolation](#namespace-isolation) + * [Desired State](#desired-state) + * [Server Side Validation](#server-side-validation) + * [CLI](#cli) +* [Download](#download) +* [Install](#install) +* [Demo Environment](#demo-environment) +* [Configuration](#configuration) + * [GitLab Authentication](#gitlab-authentication) + * [Admin Account](#admin-account) + * [Kafka Broker Authentication](#kafka-broker-authentication) + * [Managed clusters](#managed-clusters) + * [AKHQ](#akhq) +* [Administration](#administration) +* [Contribution](#contribution) ## Principles diff --git a/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java index 577d0491..442e6d1e 100644 --- a/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java @@ -398,14 +398,17 @@ private void createAndAssociateTags(List tagsToAssociate) { .toList()); schemaRegistryClient.createTags(tagsToCreate, managedClusterProperties.getName()) - .subscribe(successCreation -> { - schemaRegistryClient.associateTags(managedClusterProperties.getName(), tagsToAssociate) - .subscribe( - successAssociation -> - log.info(String.format("Success associating tag %s.", stringTags)), - error -> - log.error(String.format("Error associating tag %s.", stringTags), error)); - }, error -> log.error(String.format("Error creating tag %s.", stringTags), error)); + .subscribe( + successCreation -> + schemaRegistryClient.associateTags( + managedClusterProperties.getName(), + tagsToAssociate) + .subscribe( + successAssociation -> + log.info(String.format("Success associating tag %s.", stringTags)), + error -> + log.error(String.format("Error associating tag %s.", stringTags), error)), + error -> log.error(String.format("Error creating tag %s.", stringTags), error)); } /** diff --git a/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java b/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java index 224cf0a5..6ee95d6a 100644 --- a/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java @@ -902,25 +902,6 @@ void shouldTagsBeValid() { assertEquals(0, validationErrors.size()); } - @Test - void shouldTagsBeCreated() { - ManagedClusterProperties managedClusterProps = - new ManagedClusterProperties("local", - ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); - Properties properties = new Properties(); - managedClusterProps.setConfig(properties); - - when(managedClusterProperties.stream()).thenReturn(Stream.of(managedClusterProps)); - - List validationErrors = topicService.validateTags( - Namespace.builder().metadata( - ObjectMeta.builder().name("namespace").cluster("local").build()).build(), - Topic.builder().metadata( - ObjectMeta.builder().name("ns-topic1").build()).spec(Topic.TopicSpec.builder() - .tags(List.of("TAG_TEST")).build()).build()); - assertEquals(0, validationErrors.size()); - } - @Test void shouldTagsBeInvalidWhenNotConfluentCloud() { Namespace ns = Namespace.builder() diff --git a/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java b/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java index 72401356..a9af9409 100644 --- a/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java @@ -20,6 +20,7 @@ import com.michelin.ns4kafka.services.clients.schema.entities.TagEntities; import com.michelin.ns4kafka.services.clients.schema.entities.TagEntity; import io.micronaut.http.HttpResponse; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Properties; @@ -106,11 +107,9 @@ void shouldCreateTags() { properties.put(CLUSTER_ID, CLUSTER_ID_TEST); when(schemaRegistryClient.associateTags(anyString(), anyList())) - .thenReturn(Mono.empty()) - .thenReturn(Mono.error(new Exception("error"))); + .thenReturn(Mono.just(List.of())); when(schemaRegistryClient.createTags(anyList(), anyString())) - .thenReturn(Mono.just(List.of())) - .thenReturn(Mono.error(new Exception("error"))); + .thenReturn(Mono.just(List.of())); when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); when(managedClusterProperties.getConfig()).thenReturn(properties); @@ -141,6 +140,43 @@ void shouldCreateTags() { && tags.get(0).entityType().equals(TOPIC_ENTITY_TYPE))); } + @Test + void shouldNotAssociateTagsWhenCreationFails() { + Properties properties = new Properties(); + properties.put(CLUSTER_ID, CLUSTER_ID_TEST); + + when(schemaRegistryClient.createTags(anyList(), anyString())) + .thenReturn(Mono.error(new IOException())); + when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); + when(managedClusterProperties.getConfig()).thenReturn(properties); + + List ns4kafkaTopics = List.of( + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of(TAG1)) + .build()) + .build()); + + Map brokerTopics = Map.of(TOPIC_NAME, + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build()); + + topicAsyncExecutor.alterTags(ns4kafkaTopics, brokerTopics); + + verify(schemaRegistryClient, never()).associateTags(eq(LOCAL_CLUSTER), argThat(tags -> + tags.get(0).entityName().equals(CLUSTER_ID_TEST + ":" + TOPIC_NAME) + && tags.get(0).typeName().equals(TAG1) + && tags.get(0).entityType().equals(TOPIC_ENTITY_TYPE))); + } + @Test void shouldBeConfluentCloud() { when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); From 7d3e86b4e05e4f525ccb316cfbe8cdd16084a296 Mon Sep 17 00:00:00 2001 From: E046899 Date: Fri, 15 Dec 2023 09:31:59 +0100 Subject: [PATCH 14/15] manage feedbacks --- README.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index a0834c1d..42e2dc32 100644 --- a/README.md +++ b/README.md @@ -17,19 +17,19 @@ using [Kafkactl](https://github.com/michelin/kafkactl), which follows best pract ## Table of Contents * [Principles](#principles) - * [Namespace Isolation](#namespace-isolation) - * [Desired State](#desired-state) - * [Server Side Validation](#server-side-validation) - * [CLI](#cli) + * [Namespace Isolation](#namespace-isolation) + * [Desired State](#desired-state) + * [Server Side Validation](#server-side-validation) + * [CLI](#cli) * [Download](#download) * [Install](#install) * [Demo Environment](#demo-environment) * [Configuration](#configuration) - * [GitLab Authentication](#gitlab-authentication) - * [Admin Account](#admin-account) - * [Kafka Broker Authentication](#kafka-broker-authentication) - * [Managed clusters](#managed-clusters) - * [AKHQ](#akhq) + * [GitLab Authentication](#gitlab-authentication) + * [Admin Account](#admin-account) + * [Kafka Broker Authentication](#kafka-broker-authentication) + * [Managed clusters](#managed-clusters) + * [AKHQ](#akhq) * [Administration](#administration) * [Contribution](#contribution) From e35cc7e7840f03599e2e25ca6ebf1bab3474d889 Mon Sep 17 00:00:00 2001 From: E046899 Date: Fri, 15 Dec 2023 09:43:22 +0100 Subject: [PATCH 15/15] manage feedbacks --- .../executors/TopicAsyncExecutorTest.java | 75 ++++++++++++++----- 1 file changed, 57 insertions(+), 18 deletions(-) diff --git a/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java b/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java index a9af9409..86313c73 100644 --- a/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java @@ -114,30 +114,69 @@ void shouldCreateTags() { when(managedClusterProperties.getConfig()).thenReturn(properties); List ns4kafkaTopics = List.of( - Topic.builder() - .metadata(ObjectMeta.builder() - .name(TOPIC_NAME) - .build()) - .spec(Topic.TopicSpec.builder() - .tags(List.of(TAG1)) - .build()) - .build()); + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of(TAG1)) + .build()) + .build()); Map brokerTopics = Map.of(TOPIC_NAME, - Topic.builder() - .metadata(ObjectMeta.builder() - .name(TOPIC_NAME) - .build()) - .spec(Topic.TopicSpec.builder() - .build()) - .build()); + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build()); topicAsyncExecutor.alterTags(ns4kafkaTopics, brokerTopics); verify(schemaRegistryClient).associateTags(eq(LOCAL_CLUSTER), argThat(tags -> - tags.get(0).entityName().equals(CLUSTER_ID_TEST + ":" + TOPIC_NAME) - && tags.get(0).typeName().equals(TAG1) - && tags.get(0).entityType().equals(TOPIC_ENTITY_TYPE))); + tags.get(0).entityName().equals(CLUSTER_ID_TEST + ":" + TOPIC_NAME) + && tags.get(0).typeName().equals(TAG1) + && tags.get(0).entityType().equals(TOPIC_ENTITY_TYPE))); + } + + @Test + void shouldCreateTagsButNotAssociateThem() { + Properties properties = new Properties(); + properties.put(CLUSTER_ID, CLUSTER_ID_TEST); + + when(schemaRegistryClient.associateTags(anyString(), anyList())) + .thenReturn(Mono.error(new IOException())); + when(schemaRegistryClient.createTags(anyList(), anyString())) + .thenReturn(Mono.just(List.of())); + when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); + when(managedClusterProperties.getConfig()).thenReturn(properties); + + List ns4kafkaTopics = List.of( + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of(TAG1)) + .build()) + .build()); + + Map brokerTopics = Map.of(TOPIC_NAME, + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build()); + + topicAsyncExecutor.alterTags(ns4kafkaTopics, brokerTopics); + + verify(schemaRegistryClient).associateTags(eq(LOCAL_CLUSTER), argThat(tags -> + tags.get(0).entityName().equals(CLUSTER_ID_TEST + ":" + TOPIC_NAME) + && tags.get(0).typeName().equals(TAG1) + && tags.get(0).entityType().equals(TOPIC_ENTITY_TYPE))); } @Test