From 55b8c8ed72d8d570a96ee34e5b7bc29134b836e5 Mon Sep 17 00:00:00 2001 From: E046899 Date: Tue, 3 Oct 2023 16:58:52 +0200 Subject: [PATCH] Manage Confluent tags --- .../executors/TopicAsyncExecutor.java | 54 +++++++++++-------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java index 9e9c2f83..eca9501c 100644 --- a/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java @@ -135,7 +135,7 @@ public void synchronizeTopics() { createTopics(toCreate); alterTopics(toUpdate, toCheckConf); - if (managedClusterProperties.getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)) { + if (isConfluent()) { createTags(ns4kafkaTopics, brokerTopics); deleteTags(ns4kafkaTopics, brokerTopics); } @@ -154,25 +154,23 @@ public void synchronizeTopics() { * @param brokerTopics Topics from broker */ public void createTags(List ns4kafkaTopics, Map brokerTopics) { - List tagsToCreate = ns4kafkaTopics.stream().flatMap(ns4kafkaTopic -> { - Topic brokerTopic = brokerTopics.get(ns4kafkaTopic.getMetadata().getName()); - - List existingTags = brokerTopic != null && brokerTopic.getSpec().getTags() != null - ? brokerTopic.getSpec().getTags() - : Collections.emptyList(); - List newTags = ns4kafkaTopic.getSpec().getTags() != null - ? ns4kafkaTopic.getSpec().getTags() - : Collections.emptyList(); - - return newTags.stream().filter(tag -> !existingTags.contains(tag)).map(tag -> TagSpecs.builder() - .entityName( - managedClusterProperties.getConfig().getProperty(CLUSTER_ID) - + ":" - + ns4kafkaTopic.getMetadata().getName()) - .typeName(tag) - .entityType(TOPIC_ENTITY_TYPE) - .build()); - }).toList(); + List tagsToCreate = ns4kafkaTopics + .stream() + .filter(topic -> topic.getMetadata().getGeneration() == 1) + .flatMap(ns4kafkaTopic -> { + Topic brokerTopic = brokerTopics.get(ns4kafkaTopic.getMetadata().getName()); + Set existingTags = brokerTopic != null ? new HashSet<>(brokerTopic.getSpec().getTags()) : Collections.emptySet(); + Set newTags = new HashSet<>(ns4kafkaTopic.getSpec().getTags()); + newTags.removeAll(existingTags); + return newTags + .stream() + .map(tag -> TagSpecs.builder() + .entityName(managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + ns4kafkaTopic.getMetadata().getName()) + .typeName(tag) + .entityType(TOPIC_ENTITY_TYPE) + .build()); + }) + .toList(); if (!tagsToCreate.isEmpty()) { String stringTags = String.join(",", tagsToCreate @@ -225,6 +223,16 @@ public void deleteTags(List ns4kafkaTopics, Map brokerTopi managedClusterProperties.getName(), tag.entityName(), tag.typeName()).block()); + + if (!tagsToDelete.isEmpty()) { + tagsToDelete + .forEach(tag -> schemaRegistryClient.deleteTag( + managedClusterProperties.getName(), + tag.entityName(), + tag.typeName()) + .subscribe(success -> log.debug(String.format("Success deleting tag %s.", tag)), + error -> log.error(String.format("Error deleting tag %s.", tag)))); + } } /** @@ -266,7 +274,7 @@ public List listBrokerTopicNames() throws InterruptedException, Executio * @param topics Topics to complete */ public void enrichWithTags(Map topics) { - if (managedClusterProperties.getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)) { + if (isConfluent()) { topics.forEach((key, value) -> value.getSpec().setTags(schemaRegistryClient.getTopicWithTags(managedClusterProperties.getName(), managedClusterProperties @@ -277,6 +285,10 @@ public void enrichWithTags(Map topics) { } } + public boolean isConfluent() { + return managedClusterProperties.getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); + } + /** * Collect all topics on broker from a list of topic names. *