From 2767d2e9d2c302dbfdd91eb29e73c2d9a91cd4c3 Mon Sep 17 00:00:00 2001 From: E046899 Date: Tue, 3 Oct 2023 13:02:43 +0200 Subject: [PATCH] Manage Confluent tags --- .../ns4kafka/services/TopicService.java | 3 +- .../clients/schema/entities/TagSpecs.java | 6 + .../clients/schema/entities/TagTopicInfo.java | 6 + .../executors/TopicAsyncExecutor.java | 125 ++++++++++-------- .../ns4kafka/utils/config/ClusterConfig.java | 9 -- .../ns4kafka/utils/tags/TagsUtils.java | 8 -- .../services/executors/HttpResponseMock.java | 10 ++ .../executors/TopicAsyncExecutorTest.java | 41 +++--- 8 files changed, 110 insertions(+), 98 deletions(-) delete mode 100644 src/main/java/com/michelin/ns4kafka/utils/config/ClusterConfig.java delete mode 100644 src/main/java/com/michelin/ns4kafka/utils/tags/TagsUtils.java diff --git a/src/main/java/com/michelin/ns4kafka/services/TopicService.java b/src/main/java/com/michelin/ns4kafka/services/TopicService.java index 524a5927..e7a30b61 100644 --- a/src/main/java/com/michelin/ns4kafka/services/TopicService.java +++ b/src/main/java/com/michelin/ns4kafka/services/TopicService.java @@ -344,8 +344,7 @@ public List validateTags(Namespace namespace, Topic topic) { .map(tags -> tags.stream().map(TagInfo::name).collect(Collectors.toSet())).block(); if(tagNames == null || tagNames.isEmpty()) { - validationErrors.add(String.format("Invalid value (%s) for tags: No tags defined on the kafka cluster.", - String.join(",", topic.getSpec().getTags()))); + validationErrors.add(String.format("Invalid value (%s) for tags: No tags allowed.", String.join(",", topic.getSpec().getTags()))); return validationErrors; } diff --git a/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/TagSpecs.java b/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/TagSpecs.java index a991741a..865fd961 100644 --- a/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/TagSpecs.java +++ b/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/TagSpecs.java @@ -4,4 +4,10 @@ @Builder public record TagSpecs(String entityName, String entityType, String typeName) { + + @Override + public String toString() { + return entityName + "/" + typeName; + } + } diff --git a/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/TagTopicInfo.java b/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/TagTopicInfo.java index c3a9e393..921b6ee5 100644 --- a/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/TagTopicInfo.java +++ b/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/TagTopicInfo.java @@ -4,4 +4,10 @@ @Builder public record TagTopicInfo(String entityName, String entityType, String typeName, String entityStatus) { + + @Override + public String toString() { + return entityName + "/" + typeName; + } + } diff --git a/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java index a867c16c..e9fd43b1 100644 --- a/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java @@ -13,7 +13,6 @@ import jakarta.inject.Singleton; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; import java.time.Instant; @@ -24,7 +23,6 @@ import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.stream.Collectors; -import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.AlterConfigsResult; @@ -35,11 +33,6 @@ import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.admin.TopicListing; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.ConfigResource; - -import static com.michelin.ns4kafka.utils.config.ClusterConfig.CLUSTER_ID; -import static com.michelin.ns4kafka.utils.tags.TagsUtils.TOPIC_ENTITY_TYPE; /** * Topic executor. @@ -49,6 +42,10 @@ @Singleton @AllArgsConstructor public class TopicAsyncExecutor { + + public static final String CLUSTER_ID = "cluster.id"; + public static final String TOPIC_ENTITY_TYPE = "kafka_topic"; + private final ManagedClusterProperties managedClusterProperties; @Inject @@ -85,43 +82,43 @@ public void synchronizeTopics() { List ns4kafkaTopics = topicRepository.findAllForCluster(managedClusterProperties.getName()); List toCreate = ns4kafkaTopics.stream() - .filter(topic -> !brokerTopics.containsKey(topic.getMetadata().getName())) - .toList(); + .filter(topic -> !brokerTopics.containsKey(topic.getMetadata().getName())) + .toList(); List toCheckConf = ns4kafkaTopics.stream() - .filter(topic -> brokerTopics.containsKey(topic.getMetadata().getName())) - .toList(); + .filter(topic -> brokerTopics.containsKey(topic.getMetadata().getName())) + .toList(); Map> toUpdate = toCheckConf.stream() - .map(topic -> { - Map actualConf = - brokerTopics.get(topic.getMetadata().getName()).getSpec().getConfigs(); - Map expectedConf = - topic.getSpec().getConfigs() == null ? Map.of() : topic.getSpec().getConfigs(); - Collection topicConfigChanges = computeConfigChanges(expectedConf, actualConf); - if (!topicConfigChanges.isEmpty()) { - ConfigResource cr = - new ConfigResource(ConfigResource.Type.TOPIC, topic.getMetadata().getName()); - return Map.entry(cr, topicConfigChanges); - } - return null; - }) - .filter(Objects::nonNull) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + .map(topic -> { + Map actualConf = + brokerTopics.get(topic.getMetadata().getName()).getSpec().getConfigs(); + Map expectedConf = + topic.getSpec().getConfigs() == null ? Map.of() : topic.getSpec().getConfigs(); + Collection topicConfigChanges = computeConfigChanges(expectedConf, actualConf); + if (!topicConfigChanges.isEmpty()) { + ConfigResource cr = + new ConfigResource(ConfigResource.Type.TOPIC, topic.getMetadata().getName()); + return Map.entry(cr, topicConfigChanges); + } + return null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); if (!toCreate.isEmpty()) { log.debug("Topic(s) to create: " - + String.join(",", toCreate.stream().map(topic -> topic.getMetadata().getName()).toList())); + + String.join(",", toCreate.stream().map(topic -> topic.getMetadata().getName()).toList())); } if (!toUpdate.isEmpty()) { log.debug("Topic(s) to update: " - + String.join(",", toUpdate.keySet().stream().map(ConfigResource::name).toList())); + + String.join(",", toUpdate.keySet().stream().map(ConfigResource::name).toList())); for (Map.Entry> e : toUpdate.entrySet()) { for (AlterConfigOp op : e.getValue()) { log.debug( - e.getKey().name() + " " + op.opType().toString() + " " + op.configEntry().name() + "(" - + op.configEntry().value() + ")"); + e.getKey().name() + " " + op.opType().toString() + " " + op.configEntry().name() + "(" + + op.configEntry().value() + ")"); } } } @@ -129,9 +126,10 @@ public void synchronizeTopics() { createTopics(toCreate); alterTopics(toUpdate, toCheckConf); - createTags(ns4kafkaTopics, brokerTopics); - deleteTags(ns4kafkaTopics, brokerTopics); - + if (managedClusterProperties.getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)) { + createTags(ns4kafkaTopics, brokerTopics); + deleteTags(ns4kafkaTopics, brokerTopics); + } } catch (ExecutionException | TimeoutException | CancellationException | KafkaStoreException e) { log.error("Error", e); } catch (InterruptedException e) { @@ -159,8 +157,14 @@ public void createTags(List ns4kafkaTopics, Map brokerTopi .build()); }).toList(); - if(!tagsToCreate.isEmpty()) { - schemaRegistryClient.addTags(managedClusterProperties.getName(), tagsToCreate).block(); + if (!tagsToCreate.isEmpty()) { + String stringTags = String.join(",", tagsToCreate + .stream() + .map(Record::toString) + .toList()); + schemaRegistryClient.addTags(managedClusterProperties.getName(), tagsToCreate) + .subscribe(success -> log.debug(String.format("Success creating tag %s.", stringTags)), + error -> log.error(String.format("Error creating tag %s.", stringTags))); } } @@ -170,20 +174,26 @@ public void createTags(List ns4kafkaTopics, Map brokerTopi * @param brokerTopics Topics from broker */ public void deleteTags(List ns4kafkaTopics, Map brokerTopics) { - - List tagsToDelete = brokerTopics.values().stream().flatMap(brokerTopic -> { - Optional newTopic = ns4kafkaTopics.stream() - .filter(ns4kafkaTopic -> ns4kafkaTopic.getMetadata().getName().equals(brokerTopic.getMetadata().getName())) - .findFirst(); - List newTags = newTopic.isPresent() && newTopic.get().getSpec().getTags() != null ? newTopic.get().getSpec().getTags() : Collections.emptyList(); - List existingTags = brokerTopic.getSpec().getTags() != null ? brokerTopic.getSpec().getTags() : Collections.emptyList(); - - return existingTags.stream().filter(tag -> !newTags.contains(tag)).map(tag -> TagTopicInfo.builder() - .entityName(managedClusterProperties.getConfig().getProperty(CLUSTER_ID)+":"+brokerTopic.getMetadata().getName()) - .typeName(tag) - .entityType(TOPIC_ENTITY_TYPE) - .build()); - }).toList(); + List tagsToDelete = brokerTopics + .values() + .stream() + .flatMap(brokerTopic -> { + Optional newTopic = ns4kafkaTopics + .stream() + .filter(ns4kafkaTopic -> ns4kafkaTopic.getMetadata().getName().equals(brokerTopic.getMetadata().getName())) + .findFirst(); + + Set existingTags = new HashSet<>(brokerTopic.getSpec().getTags()); + Set newTags = newTopic.isPresent() ? new HashSet<>(newTopic.get().getSpec().getTags()) : Collections.emptySet(); + existingTags.removeAll(newTags); + return existingTags + .stream() + .map(tag -> TagTopicInfo.builder() + .entityName(managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + brokerTopic.getMetadata().getName()) + .typeName(tag) + .entityType(TOPIC_ENTITY_TYPE) + .build()); + }).toList(); tagsToDelete.forEach(tag -> schemaRegistryClient.deleteTag(managedClusterProperties.getName(), tag.entityName(), tag.typeName()).block()); } @@ -221,16 +231,15 @@ public List listBrokerTopicNames() throws InterruptedException, Executio } /** - * Complete topics with confluent tags + * Enrich topics with confluent tags * @param topics Topics to complete */ - public void completeWithTags(Map topics) { + public void enrichWithTags(Map topics) { if(managedClusterProperties.getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)) { - topics.entrySet().stream() - .forEach(entry -> - entry.getValue().getSpec().setTags(schemaRegistryClient.getTopicWithTags(managedClusterProperties.getName(), - managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + entry.getValue().getMetadata().getName()) - .block().stream().map(TagTopicInfo::typeName).toList())); + topics.forEach((key,value) -> + value.getSpec().setTags(schemaRegistryClient.getTopicWithTags(managedClusterProperties.getName(), + managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + value.getMetadata().getName()) + .block().stream().map(TagTopicInfo::typeName).toList())); } } @@ -279,8 +288,8 @@ public Map collectBrokerTopicsFromNames(List topicNames) .build() ) .collect(Collectors.toMap(topic -> topic.getMetadata().getName(), Function.identity())); - - completeWithTags(topics); + + enrichWithTags(topics); return topics; } diff --git a/src/main/java/com/michelin/ns4kafka/utils/config/ClusterConfig.java b/src/main/java/com/michelin/ns4kafka/utils/config/ClusterConfig.java deleted file mode 100644 index d05e79fd..00000000 --- a/src/main/java/com/michelin/ns4kafka/utils/config/ClusterConfig.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.michelin.ns4kafka.utils.config; - -public final class ClusterConfig { - - public static final String CLUSTER_ID = "cluster.id"; - - private ClusterConfig() { - } -} diff --git a/src/main/java/com/michelin/ns4kafka/utils/tags/TagsUtils.java b/src/main/java/com/michelin/ns4kafka/utils/tags/TagsUtils.java deleted file mode 100644 index a75e8f23..00000000 --- a/src/main/java/com/michelin/ns4kafka/utils/tags/TagsUtils.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.michelin.ns4kafka.utils.tags; - -public final class TagsUtils { - public static final String TOPIC_ENTITY_TYPE = "kafka_topic"; - - private TagsUtils() { - } -} diff --git a/src/test/java/com/michelin/ns4kafka/services/executors/HttpResponseMock.java b/src/test/java/com/michelin/ns4kafka/services/executors/HttpResponseMock.java index 5dea72d5..f91f2486 100644 --- a/src/test/java/com/michelin/ns4kafka/services/executors/HttpResponseMock.java +++ b/src/test/java/com/michelin/ns4kafka/services/executors/HttpResponseMock.java @@ -27,4 +27,14 @@ public MutableConvertibleValues getAttributes() { public Optional getBody() { return Optional.empty(); } + + @Override + public String reason() { + return null; + } + + @Override + public int code() { + return 0; + } } diff --git a/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java b/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java index 956647e0..6b92d0f7 100644 --- a/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java @@ -1,8 +1,8 @@ package com.michelin.ns4kafka.services.executors; -import com.michelin.ns4kafka.config.KafkaAsyncExecutorConfig; import com.michelin.ns4kafka.models.ObjectMeta; import com.michelin.ns4kafka.models.Topic; +import com.michelin.ns4kafka.properties.ManagedClusterProperties; import com.michelin.ns4kafka.services.clients.schema.SchemaRegistryClient; import com.michelin.ns4kafka.services.clients.schema.entities.TagSpecs; import com.michelin.ns4kafka.services.clients.schema.entities.TagTopicInfo; @@ -11,13 +11,12 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.testcontainers.shaded.org.hamcrest.Matchers; import reactor.core.publisher.Mono; import java.util.*; -import static com.michelin.ns4kafka.utils.config.ClusterConfig.CLUSTER_ID; -import static com.michelin.ns4kafka.utils.tags.TagsUtils.TOPIC_ENTITY_TYPE; +import static com.michelin.ns4kafka.services.executors.TopicAsyncExecutor.CLUSTER_ID; +import static com.michelin.ns4kafka.services.executors.TopicAsyncExecutor.TOPIC_ENTITY_TYPE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.mockito.ArgumentMatchers.anyList; @@ -36,7 +35,7 @@ class TopicAsyncExecutorTest { SchemaRegistryClient schemaRegistryClient; @Mock - KafkaAsyncExecutorConfig kafkaAsyncExecutorConfig; + ManagedClusterProperties managedClusterProperties; @InjectMocks TopicAsyncExecutor topicAsyncExecutor; @@ -45,11 +44,11 @@ class TopicAsyncExecutorTest { void createTagsShouldAddTags() { Properties properties = new Properties(); properties.put(CLUSTER_ID, CLUSTER_ID_TEST); - kafkaAsyncExecutorConfig.setConfig(properties); + managedClusterProperties.setConfig(properties); when(schemaRegistryClient.addTags(anyString(), anyList())).thenReturn(Mono.just(new ArrayList<>())); - when(kafkaAsyncExecutorConfig.getConfig()).thenReturn(properties); - when(kafkaAsyncExecutorConfig.getName()).thenReturn(LOCAL_CLUSTER); + when(managedClusterProperties.getConfig()).thenReturn(properties); + when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); List ns4kafkaTopics = new ArrayList<>(); Topic ns4kafkaTopic = Topic.builder() @@ -79,7 +78,7 @@ void createTagsShouldAddTags() { void createTagsShouldNotAddTags() { Properties properties = new Properties(); properties.put(CLUSTER_ID,CLUSTER_ID_TEST); - kafkaAsyncExecutorConfig.setConfig(properties); + managedClusterProperties.setConfig(properties); List ns4kafkaTopics = new ArrayList<>(); Topic ns4kafkaTopic = Topic.builder() @@ -106,11 +105,11 @@ void createTagsShouldNotAddTags() { void deleteTagsShouldDeleteTags() { Properties properties = new Properties(); properties.put(CLUSTER_ID,CLUSTER_ID_TEST); - kafkaAsyncExecutorConfig.setConfig(properties); + managedClusterProperties.setConfig(properties); when(schemaRegistryClient.deleteTag(anyString(),anyString(),anyString())).thenReturn(Mono.just(new HttpResponseMock())); - when(kafkaAsyncExecutorConfig.getConfig()).thenReturn(properties); - when(kafkaAsyncExecutorConfig.getName()).thenReturn(LOCAL_CLUSTER); + when(managedClusterProperties.getConfig()).thenReturn(properties); + when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); List ns4kafkaTopics = new ArrayList<>(); Topic ns4kafkaTopic = Topic.builder() @@ -137,7 +136,7 @@ void deleteTagsShouldDeleteTags() { void deleteTagsShouldNotDeleteTags() { Properties properties = new Properties(); properties.put(CLUSTER_ID,CLUSTER_ID_TEST); - kafkaAsyncExecutorConfig.setConfig(properties); + managedClusterProperties.setConfig(properties); List ns4kafkaTopics = new ArrayList<>(); Topic ns4kafkaTopic = Topic.builder() @@ -164,14 +163,14 @@ void deleteTagsShouldNotDeleteTags() { void completeWithTagsShouldComplete() { Properties properties = new Properties(); properties.put(CLUSTER_ID,CLUSTER_ID_TEST); - kafkaAsyncExecutorConfig.setConfig(properties); + managedClusterProperties.setConfig(properties); TagTopicInfo tagTopicInfo = TagTopicInfo.builder().typeName(TAG1).build(); when(schemaRegistryClient.getTopicWithTags(anyString(),anyString())).thenReturn(Mono.just(List.of(tagTopicInfo))); - when(kafkaAsyncExecutorConfig.getConfig()).thenReturn(properties); - when(kafkaAsyncExecutorConfig.getName()).thenReturn(LOCAL_CLUSTER); - when(kafkaAsyncExecutorConfig.getProvider()).thenReturn(KafkaAsyncExecutorConfig.KafkaProvider.CONFLUENT_CLOUD); + when(managedClusterProperties.getConfig()).thenReturn(properties); + when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); + when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); Map brokerTopics = new HashMap<>(); Topic brokerTopic = Topic.builder() @@ -180,7 +179,7 @@ void completeWithTagsShouldComplete() { .spec(Topic.TopicSpec.builder().build()).build(); brokerTopics.put(TOPIC_NAME, brokerTopic); - topicAsyncExecutor.completeWithTags(brokerTopics); + topicAsyncExecutor.enrichWithTags(brokerTopics); assertEquals(TAG1,brokerTopics.get(TOPIC_NAME).getSpec().getTags().get(0)); } @@ -189,9 +188,9 @@ void completeWithTagsShouldComplete() { void completeWithTagsShouldNotComplete() { Properties properties = new Properties(); properties.put(CLUSTER_ID,CLUSTER_ID_TEST); - kafkaAsyncExecutorConfig.setConfig(properties); + managedClusterProperties.setConfig(properties); - when(kafkaAsyncExecutorConfig.getProvider()).thenReturn(KafkaAsyncExecutorConfig.KafkaProvider.SELF_MANAGED); + when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.SELF_MANAGED); Map brokerTopics = new HashMap<>(); Topic brokerTopic = Topic.builder() @@ -200,7 +199,7 @@ void completeWithTagsShouldNotComplete() { .spec(Topic.TopicSpec.builder().build()).build(); brokerTopics.put(TOPIC_NAME, brokerTopic); - topicAsyncExecutor.completeWithTags(brokerTopics); + topicAsyncExecutor.enrichWithTags(brokerTopics); assertNull(brokerTopics.get(TOPIC_NAME).getSpec().getTags()); }