From f2a2a3009bd68a9ef31e088d5cf3ad8f72a2cc90 Mon Sep 17 00:00:00 2001 From: E046899 Date: Tue, 3 Oct 2023 12:15:45 +0200 Subject: [PATCH] Manage Confluent tags --- .../controllers/topic/TopicController.java | 6 +---- .../ns4kafka/services/TopicService.java | 11 ++++----- .../clients/schema/SchemaRegistryClient.java | 14 ++++------- .../executors/TopicAsyncExecutor.java | 23 ++++++++----------- .../ns4kafka/services/TopicServiceTest.java | 9 ++++---- 5 files changed, 24 insertions(+), 39 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controllers/topic/TopicController.java b/src/main/java/com/michelin/ns4kafka/controllers/topic/TopicController.java index 1fcc3705..89143b5a 100644 --- a/src/main/java/com/michelin/ns4kafka/controllers/topic/TopicController.java +++ b/src/main/java/com/michelin/ns4kafka/controllers/topic/TopicController.java @@ -21,11 +21,7 @@ import jakarta.inject.Inject; import jakarta.validation.Valid; import java.time.Instant; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; diff --git a/src/main/java/com/michelin/ns4kafka/services/TopicService.java b/src/main/java/com/michelin/ns4kafka/services/TopicService.java index 55d48333..524a5927 100644 --- a/src/main/java/com/michelin/ns4kafka/services/TopicService.java +++ b/src/main/java/com/michelin/ns4kafka/services/TopicService.java @@ -16,11 +16,8 @@ import io.micronaut.inject.qualifiers.Qualifiers; import jakarta.inject.Inject; import jakarta.inject.Singleton; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Optional; + +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -333,12 +330,12 @@ public Map deleteRecords(Topic topic, Map validateTags(Namespace namespace, Topic topic) { List validationErrors = new ArrayList<>(); - Optional topicCluster = kafkaAsyncExecutorConfig + Optional topicCluster = managedClusterProperties .stream() .filter(cluster -> namespace.getMetadata().getCluster().equals(cluster.getName())) .findFirst(); - if(topicCluster.isPresent() && !topicCluster.get().getProvider().equals(KafkaAsyncExecutorConfig.KafkaProvider.CONFLUENT_CLOUD)) { + if(topicCluster.isPresent() && !topicCluster.get().getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)) { validationErrors.add(String.format("Invalid value (%s) for tags: Tags are not currently supported.", String.join(",", topic.getSpec().getTags()))); return validationErrors; } diff --git a/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java b/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java index b4aa05b7..5685c7e0 100644 --- a/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java +++ b/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java @@ -1,11 +1,7 @@ package com.michelin.ns4kafka.services.clients.schema; import com.michelin.ns4kafka.properties.ManagedClusterProperties; -import com.michelin.ns4kafka.services.clients.schema.entities.SchemaCompatibilityCheckResponse; -import com.michelin.ns4kafka.services.clients.schema.entities.SchemaCompatibilityRequest; -import com.michelin.ns4kafka.services.clients.schema.entities.SchemaCompatibilityResponse; -import com.michelin.ns4kafka.services.clients.schema.entities.SchemaRequest; -import com.michelin.ns4kafka.services.clients.schema.entities.SchemaResponse; +import com.michelin.ns4kafka.services.clients.schema.entities.*; import com.michelin.ns4kafka.utils.exceptions.ResourceValidationException; import io.micronaut.core.type.Argument; import io.micronaut.core.util.StringUtils; @@ -179,7 +175,7 @@ public Mono deleteCurrentCompatibilityBySubject(Str * @return A list of tags */ public Mono> getTags(String kafkaCluster) { - KafkaAsyncExecutorConfig.RegistryConfig config = getSchemaRegistry(kafkaCluster); + ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); HttpRequest request = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/types/tagdefs"))) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class))); @@ -192,7 +188,7 @@ public Mono> getTags(String kafkaCluster) { * @return A list of tags */ public Mono> getTopicWithTags(String kafkaCluster, String entityName) { - KafkaAsyncExecutorConfig.RegistryConfig config = getSchemaRegistry(kafkaCluster); + ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); HttpRequest request = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/entity/type/kafka_topic/name/" + entityName + "/tags"))) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class))); @@ -205,7 +201,7 @@ public Mono> getTopicWithTags(String kafkaCluster, String ent * @return Information about added tags */ public Mono> addTags(String kafkaCluster, List tagSpecs) { - KafkaAsyncExecutorConfig.RegistryConfig config = getSchemaRegistry(kafkaCluster); + ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); HttpRequest request = HttpRequest.POST(URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/entity/tags")), tagSpecs) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class))); @@ -219,7 +215,7 @@ public Mono> addTags(String kafkaCluster, List tagS * @return The resume response */ public Mono> deleteTag(String kafkaCluster, String entityName, String tagName) { - KafkaAsyncExecutorConfig.RegistryConfig config = getSchemaRegistry(kafkaCluster); + ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); HttpRequest request = HttpRequest.DELETE(URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/entity/type/kafka_topic/name/" + entityName + "/tags/" + tagName))) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); return Mono.from(httpClient.exchange(request, Void.class)); diff --git a/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java index 8730d456..a867c16c 100644 --- a/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java @@ -17,12 +17,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; import java.time.Instant; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -56,8 +51,10 @@ public class TopicAsyncExecutor { private final ManagedClusterProperties managedClusterProperties; + @Inject TopicRepository topicRepository; + @Inject SchemaRegistryClient schemaRegistryClient; public TopicAsyncExecutor(ManagedClusterProperties managedClusterProperties) { @@ -156,14 +153,14 @@ public void createTags(List ns4kafkaTopics, Map brokerTopi List newTags = ns4kafkaTopic.getSpec().getTags() != null ? ns4kafkaTopic.getSpec().getTags() : Collections.emptyList(); return newTags.stream().filter(tag -> !existingTags.contains(tag)).map(tag -> TagSpecs.builder() - .entityName(kafkaAsyncExecutorConfig.getConfig().getProperty(CLUSTER_ID)+":"+ns4kafkaTopic.getMetadata().getName()) + .entityName(managedClusterProperties.getConfig().getProperty(CLUSTER_ID)+":"+ns4kafkaTopic.getMetadata().getName()) .typeName(tag) .entityType(TOPIC_ENTITY_TYPE) .build()); }).toList(); if(!tagsToCreate.isEmpty()) { - schemaRegistryClient.addTags(kafkaAsyncExecutorConfig.getName(), tagsToCreate).block(); + schemaRegistryClient.addTags(managedClusterProperties.getName(), tagsToCreate).block(); } } @@ -182,13 +179,13 @@ public void deleteTags(List ns4kafkaTopics, Map brokerTopi List existingTags = brokerTopic.getSpec().getTags() != null ? brokerTopic.getSpec().getTags() : Collections.emptyList(); return existingTags.stream().filter(tag -> !newTags.contains(tag)).map(tag -> TagTopicInfo.builder() - .entityName(kafkaAsyncExecutorConfig.getConfig().getProperty(CLUSTER_ID)+":"+brokerTopic.getMetadata().getName()) + .entityName(managedClusterProperties.getConfig().getProperty(CLUSTER_ID)+":"+brokerTopic.getMetadata().getName()) .typeName(tag) .entityType(TOPIC_ENTITY_TYPE) .build()); }).toList(); - tagsToDelete.forEach(tag -> schemaRegistryClient.deleteTag(kafkaAsyncExecutorConfig.getName(), tag.entityName(), tag.typeName()).block()); + tagsToDelete.forEach(tag -> schemaRegistryClient.deleteTag(managedClusterProperties.getName(), tag.entityName(), tag.typeName()).block()); } /** @@ -228,11 +225,11 @@ public List listBrokerTopicNames() throws InterruptedException, Executio * @param topics Topics to complete */ public void completeWithTags(Map topics) { - if(kafkaAsyncExecutorConfig.getProvider().equals(KafkaAsyncExecutorConfig.KafkaProvider.CONFLUENT_CLOUD)) { + if(managedClusterProperties.getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)) { topics.entrySet().stream() .forEach(entry -> - entry.getValue().getSpec().setTags(schemaRegistryClient.getTopicWithTags(kafkaAsyncExecutorConfig.getName(), - kafkaAsyncExecutorConfig.getConfig().getProperty(CLUSTER_ID) + ":" + entry.getValue().getMetadata().getName()) + entry.getValue().getSpec().setTags(schemaRegistryClient.getTopicWithTags(managedClusterProperties.getName(), + managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + entry.getValue().getMetadata().getName()) .block().stream().map(TagTopicInfo::typeName).toList())); } } diff --git a/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java b/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java index 8d1bec45..a0168da3 100644 --- a/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java @@ -13,7 +13,6 @@ import com.michelin.ns4kafka.models.Topic; import com.michelin.ns4kafka.properties.ManagedClusterProperties; import com.michelin.ns4kafka.repositories.TopicRepository; -import com.michelin.ns4kafka.config.KafkaAsyncExecutorConfig; import com.michelin.ns4kafka.services.clients.schema.SchemaRegistryClient; import com.michelin.ns4kafka.services.clients.schema.entities.TagInfo; import com.michelin.ns4kafka.services.executors.TopicAsyncExecutor; @@ -902,7 +901,7 @@ void shouldTagsBeValid() { List tagInfo = List.of(TagInfo.builder().name("TAG_TEST").build()); - when(kafkaAsyncExecutorConfigs.stream()).thenReturn(Stream.of(new KafkaAsyncExecutorConfig("local", KafkaAsyncExecutorConfig.KafkaProvider.CONFLUENT_CLOUD))); + when(managedClusterProperties.stream()).thenReturn(Stream.of(new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD))); when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(tagInfo)); List validationErrors = topicService.validateTags(ns, topic); @@ -924,7 +923,7 @@ void shouldTagsBeInvalidWhenNotConfluentCloud() { .tags(List.of("TAG_TEST")).build()) .build(); - when(kafkaAsyncExecutorConfigs.stream()).thenReturn(Stream.of(new KafkaAsyncExecutorConfig("local", KafkaAsyncExecutorConfig.KafkaProvider.SELF_MANAGED))); + when(managedClusterProperties.stream()).thenReturn(Stream.of(new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.SELF_MANAGED))); List validationErrors = topicService.validateTags(ns, topic); assertEquals(1, validationErrors.size()); @@ -946,7 +945,7 @@ void shouldTagsBeInvalidWhenNoTagsAllowed() { .tags(List.of("TAG_TEST")).build()) .build(); - when(kafkaAsyncExecutorConfigs.stream()).thenReturn(Stream.of(new KafkaAsyncExecutorConfig("local", KafkaAsyncExecutorConfig.KafkaProvider.CONFLUENT_CLOUD))); + when(managedClusterProperties.stream()).thenReturn(Stream.of(new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD))); when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(Collections.emptyList())); List validationErrors = topicService.validateTags(ns, topic); @@ -971,7 +970,7 @@ void shouldTagsBeInvalidWhenNotAllowed() { List tagInfo = List.of(TagInfo.builder().name("TAG_TEST").build()); - when(kafkaAsyncExecutorConfigs.stream()).thenReturn(Stream.of(new KafkaAsyncExecutorConfig("local", KafkaAsyncExecutorConfig.KafkaProvider.CONFLUENT_CLOUD))); + when(managedClusterProperties.stream()).thenReturn(Stream.of(new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD))); when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(tagInfo)); List validationErrors = topicService.validateTags(ns, topic);