diff --git a/README.md b/README.md index 4cdb9214..42e2dc32 100644 --- a/README.md +++ b/README.md @@ -188,6 +188,7 @@ ns4kafka: sasl.mechanism: "PLAIN" security.protocol: "SASL_PLAINTEXT" sasl.jaas.config: "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin\";" + cluster.id: "lkc-abcde" schema-registry: url: "http://localhost:8081" basicAuthUsername: "user" @@ -202,21 +203,22 @@ ns4kafka: The name for each managed cluster has to be unique. This is this name you have to set in the field **metadata.cluster** of your namespace descriptors. -| Property | type | description | -|-----------------------------------------|---------|-------------------------------------------------------------| -| manage-users | boolean | Does the cluster manages users ? | -| manage-acls | boolean | Does the cluster manages access control entries ? | -| manage-topics | boolean | Does the cluster manages topics ? | -| manage-connectors | boolean | Does the cluster manages connects ? | -| drop-unsync-acls | boolean | Should Ns4Kafka drop unsynchronized ACLs | -| provider | boolean | The kind of cluster. Either SELF_MANAGED or CONFLUENT_CLOUD | -| config.bootstrap.servers | string | The location of the clusters servers | -| schema-registry.url | string | The location of the Schema Registry | -| schema-registry.basicAuthUsername | string | Basic authentication username to the Schema Registry | -| schema-registry.basicAuthPassword | string | Basic authentication password to the Schema Registry | -| connects.connect-name.url | string | The location of the kafka connect | -| connects.connect-name.basicAuthUsername | string | Basic authentication username to the Kafka Connect | -| connects.connect-name.basicAuthPassword | string | Basic authentication password to the Kafka Connect | +| Property | type | description | +|-----------------------------------------|---------|----------------------------------------------------------------------------------------------------------------------------------------| +| manage-users | boolean | Does the cluster manages users ? | +| manage-acls | boolean | Does the cluster manages access control entries ? | +| manage-topics | boolean | Does the cluster manages topics ? | +| manage-connectors | boolean | Does the cluster manages connects ? | +| drop-unsync-acls | boolean | Should Ns4Kafka drop unsynchronized ACLs | +| provider | boolean | The kind of cluster. Either SELF_MANAGED or CONFLUENT_CLOUD | +| config.bootstrap.servers | string | The location of the clusters servers | +| config.cluster.id | string | The cluster id. Required to use [Confluent Cloud tags](https://docs.confluent.io/cloud/current/stream-governance/stream-catalog.html). | +| schema-registry.url | string | The location of the Schema Registry | +| schema-registry.basicAuthUsername | string | Basic authentication username to the Schema Registry | +| schema-registry.basicAuthPassword | string | Basic authentication password to the Schema Registry | +| connects.connect-name.url | string | The location of the kafka connect | +| connects.connect-name.basicAuthUsername | string | Basic authentication username to the Kafka Connect | +| connects.connect-name.basicAuthPassword | string | Basic authentication password to the Kafka Connect | The configuration will depend on the authentication method selected for your broker, schema registry and Kafka Connect. diff --git a/src/main/java/com/michelin/ns4kafka/controllers/topic/TopicController.java b/src/main/java/com/michelin/ns4kafka/controllers/topic/TopicController.java index 16b5876d..89d63d66 100644 --- a/src/main/java/com/michelin/ns4kafka/controllers/topic/TopicController.java +++ b/src/main/java/com/michelin/ns4kafka/controllers/topic/TopicController.java @@ -22,13 +22,13 @@ import jakarta.validation.Valid; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; /** @@ -108,6 +108,13 @@ public HttpResponse apply(String namespace, @Valid @Body Topic topic, validationErrors.addAll(topicService.validateTopicUpdate(ns, existingTopic.get(), topic)); } + List existingTags = existingTopic + .map(oldTopic -> oldTopic.getSpec().getTags()) + .orElse(Collections.emptyList()); + if (topic.getSpec().getTags().stream().anyMatch(newTag -> !existingTags.contains(newTag))) { + validationErrors.addAll(topicService.validateTags(ns, topic)); + } + if (!validationErrors.isEmpty()) { throw new ResourceValidationException(validationErrors, topic.getKind(), topic.getMetadata().getName()); } @@ -268,6 +275,6 @@ public List deleteRecords(String namespace, String topic, .offset(entry.getValue()) .build()) .build()) - .collect(Collectors.toList()); + .toList(); } } diff --git a/src/main/java/com/michelin/ns4kafka/models/ObjectMeta.java b/src/main/java/com/michelin/ns4kafka/models/ObjectMeta.java index bfd73315..a3479fbd 100644 --- a/src/main/java/com/michelin/ns4kafka/models/ObjectMeta.java +++ b/src/main/java/com/michelin/ns4kafka/models/ObjectMeta.java @@ -32,5 +32,4 @@ public class ObjectMeta { @EqualsAndHashCode.Exclude @JsonFormat(shape = JsonFormat.Shape.STRING) private Date creationTimestamp; - } diff --git a/src/main/java/com/michelin/ns4kafka/models/Topic.java b/src/main/java/com/michelin/ns4kafka/models/Topic.java index eda33ca1..f3856741 100644 --- a/src/main/java/com/michelin/ns4kafka/models/Topic.java +++ b/src/main/java/com/michelin/ns4kafka/models/Topic.java @@ -1,12 +1,16 @@ package com.michelin.ns4kafka.models; import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonSetter; +import com.fasterxml.jackson.annotation.Nulls; import io.micronaut.core.annotation.Introspected; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; import java.time.Instant; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.Map; import lombok.AllArgsConstructor; import lombok.Builder; @@ -32,6 +36,7 @@ public class Topic { @NotNull private ObjectMeta metadata; + @Valid @NotNull private TopicSpec spec; @@ -52,11 +57,15 @@ public enum TopicPhase { */ @Data @Builder + @Introspected @NoArgsConstructor @AllArgsConstructor public static class TopicSpec { private int replicationFactor; private int partitions; + @Builder.Default + @JsonSetter(nulls = Nulls.AS_EMPTY) + private List tags = new ArrayList<>(); private Map configs; } diff --git a/src/main/java/com/michelin/ns4kafka/services/TopicService.java b/src/main/java/com/michelin/ns4kafka/services/TopicService.java index 0660093b..fb8479a5 100644 --- a/src/main/java/com/michelin/ns4kafka/services/TopicService.java +++ b/src/main/java/com/michelin/ns4kafka/services/TopicService.java @@ -9,6 +9,8 @@ import com.michelin.ns4kafka.models.Topic; import com.michelin.ns4kafka.properties.ManagedClusterProperties; import com.michelin.ns4kafka.repositories.TopicRepository; +import com.michelin.ns4kafka.services.clients.schema.SchemaRegistryClient; +import com.michelin.ns4kafka.services.clients.schema.entities.TagInfo; import com.michelin.ns4kafka.services.executors.TopicAsyncExecutor; import io.micronaut.context.ApplicationContext; import io.micronaut.inject.qualifiers.Qualifiers; @@ -19,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -42,6 +45,9 @@ public class TopicService { @Inject List managedClusterProperties; + @Inject + SchemaRegistryClient schemaRegistryClient; + /** * Find all topics. * @@ -195,7 +201,6 @@ public List validateTopicUpdate(Namespace namespace, Topic existingTopic + "Please create a new topic with `compact` policy specified instead.", newTopic.getSpec().getConfigs().get(CLEANUP_POLICY_CONFIG))); } - return validationErrors; } @@ -319,4 +324,52 @@ public Map deleteRecords(Topic topic, Map validateTags(Namespace namespace, Topic topic) { + List validationErrors = new ArrayList<>(); + + Optional topicCluster = managedClusterProperties + .stream() + .filter(cluster -> namespace.getMetadata().getCluster().equals(cluster.getName())) + .findFirst(); + + if (topicCluster.isPresent() + && !topicCluster.get().getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)) { + validationErrors.add(String.format( + "Invalid value %s for tags: Tags are not currently supported.", + String.join(", ", topic.getSpec().getTags()))); + return validationErrors; + } + + Set tagNames = schemaRegistryClient.getTags(namespace.getMetadata().getCluster()) + .map(tags -> tags.stream().map(TagInfo::name).collect(Collectors.toSet())).block(); + + if (tagNames == null || tagNames.isEmpty()) { + validationErrors.add(String.format( + "Invalid value %s for tags: No tags allowed.", + String.join(", ", topic.getSpec().getTags()))); + return validationErrors; + } + + List unavailableTagNames = topic.getSpec().getTags() + .stream() + .filter(tagName -> !tagNames.contains(tagName)) + .toList(); + + if (!unavailableTagNames.isEmpty()) { + validationErrors.add(String.format( + "Invalid value %s for tags: Available tags are %s.", + String.join(", ", unavailableTagNames), + String.join(", ", tagNames))); + } + + return validationErrors; + } } diff --git a/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java b/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java index 7b8c88fa..a7093c29 100644 --- a/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java +++ b/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java @@ -6,9 +6,13 @@ import com.michelin.ns4kafka.services.clients.schema.entities.SchemaCompatibilityResponse; import com.michelin.ns4kafka.services.clients.schema.entities.SchemaRequest; import com.michelin.ns4kafka.services.clients.schema.entities.SchemaResponse; +import com.michelin.ns4kafka.services.clients.schema.entities.TagInfo; +import com.michelin.ns4kafka.services.clients.schema.entities.TagTopicInfo; import com.michelin.ns4kafka.utils.exceptions.ResourceValidationException; +import io.micronaut.core.type.Argument; import io.micronaut.core.util.StringUtils; import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; import io.micronaut.http.HttpStatus; import io.micronaut.http.MutableHttpRequest; import io.micronaut.http.client.HttpClient; @@ -171,6 +175,73 @@ public Mono deleteCurrentCompatibilityBySubject(Str return Mono.from(httpClient.retrieve(request, SchemaCompatibilityResponse.class)); } + /** + * List tags. + * + * @param kafkaCluster The Kafka cluster + * @return A list of tags + */ + public Mono> getTags(String kafkaCluster) { + ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); + HttpRequest request = HttpRequest + .GET(URI.create(StringUtils.prependUri( + config.getUrl(), "/catalog/v1/types/tagdefs"))) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class))); + } + + /** + * List tags of a topic. + * + * @param kafkaCluster The Kafka cluster + * @param entityName The topic's name for the API + * @return A list of tags + */ + public Mono> getTopicWithTags(String kafkaCluster, String entityName) { + ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); + HttpRequest request = HttpRequest + .GET(URI.create(StringUtils.prependUri( + config.getUrl(), + "/catalog/v1/entity/type/kafka_topic/name/" + entityName + "/tags"))) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class))); + } + + /** + * Add a tag to a topic. + * + * @param kafkaCluster The Kafka cluster + * @param tagSpecs Tags to add + * @return Information about added tags + */ + public Mono> addTags(String kafkaCluster, List tagSpecs) { + ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); + HttpRequest request = HttpRequest + .POST(URI.create(StringUtils.prependUri( + config.getUrl(), + "/catalog/v1/entity/tags")), tagSpecs) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class))); + } + + /** + * Delete a tag to a topic. + * + * @param kafkaCluster The Kafka cluster + * @param entityName The topic's name + * @param tagName The tag to delete + * @return The resume response + */ + public Mono> deleteTag(String kafkaCluster, String entityName, String tagName) { + ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); + HttpRequest request = HttpRequest + .DELETE(URI.create(StringUtils.prependUri( + config.getUrl(), + "/catalog/v1/entity/type/kafka_topic/name/" + entityName + "/tags/" + tagName))) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.exchange(request, Void.class)); + } + /** * Get the schema registry of the given Kafka cluster. * diff --git a/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/TagInfo.java b/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/TagInfo.java new file mode 100644 index 00000000..679cd5d3 --- /dev/null +++ b/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/TagInfo.java @@ -0,0 +1,12 @@ +package com.michelin.ns4kafka.services.clients.schema.entities; + +import lombok.Builder; + +/** + * Tag name. + * + * @param name Tag name + */ +@Builder +public record TagInfo(String name) { +} \ No newline at end of file diff --git a/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/TagTopicInfo.java b/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/TagTopicInfo.java new file mode 100644 index 00000000..891b7f6b --- /dev/null +++ b/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/TagTopicInfo.java @@ -0,0 +1,20 @@ +package com.michelin.ns4kafka.services.clients.schema.entities; + +import lombok.Builder; + +/** + * Information on tag. + * + * @param entityName The entity name + * @param entityType The entity type + * @param typeName The type name + * @param entityStatus The entity status + */ +@Builder +public record TagTopicInfo(String entityName, String entityType, String typeName, String entityStatus) { + + @Override + public String toString() { + return entityName + "/" + typeName; + } +} diff --git a/src/main/java/com/michelin/ns4kafka/services/executors/AccessControlEntryAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/services/executors/AccessControlEntryAsyncExecutor.java index d22ef3ca..a032703f 100644 --- a/src/main/java/com/michelin/ns4kafka/services/executors/AccessControlEntryAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/services/executors/AccessControlEntryAsyncExecutor.java @@ -15,7 +15,6 @@ import com.michelin.ns4kafka.services.ConnectorService; import com.michelin.ns4kafka.services.StreamService; import io.micronaut.context.annotation.EachBean; -import jakarta.inject.Inject; import jakarta.inject.Singleton; import java.util.ArrayList; import java.util.List; @@ -24,6 +23,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.stream.Stream; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.acl.AclBinding; @@ -40,26 +40,19 @@ @Slf4j @EachBean(ManagedClusterProperties.class) @Singleton +@AllArgsConstructor public class AccessControlEntryAsyncExecutor { private static final String USER_PRINCIPAL = "User:"; private final ManagedClusterProperties managedClusterProperties; - @Inject - AccessControlEntryService accessControlEntryService; + private AccessControlEntryService accessControlEntryService; - @Inject - StreamService streamService; + private StreamService streamService; - @Inject - ConnectorService connectorService; + private ConnectorService connectorService; - @Inject - NamespaceRepository namespaceRepository; - - public AccessControlEntryAsyncExecutor(ManagedClusterProperties managedClusterProperties) { - this.managedClusterProperties = managedClusterProperties; - } + private NamespaceRepository namespaceRepository; /** * Run the ACLs synchronization. diff --git a/src/main/java/com/michelin/ns4kafka/services/executors/ConnectorAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/services/executors/ConnectorAsyncExecutor.java index 09464a1c..3a06111e 100644 --- a/src/main/java/com/michelin/ns4kafka/services/executors/ConnectorAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/services/executors/ConnectorAsyncExecutor.java @@ -12,13 +12,13 @@ import com.michelin.ns4kafka.services.clients.connect.entities.ConnectorStatus; import io.micronaut.context.annotation.EachBean; import io.micronaut.http.client.exceptions.HttpClientResponseException; -import jakarta.inject.Inject; import jakarta.inject.Singleton; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Stream; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -29,20 +29,18 @@ @Slf4j @EachBean(ManagedClusterProperties.class) @Singleton +@AllArgsConstructor public class ConnectorAsyncExecutor { - private final ManagedClusterProperties managedClusterProperties; private final Set healthyConnectClusters = new HashSet<>(); private final Set idleConnectClusters = new HashSet<>(); - @Inject + + private final ManagedClusterProperties managedClusterProperties; + private ConnectorRepository connectorRepository; - @Inject + private KafkaConnectClient kafkaConnectClient; - @Inject - private ConnectClusterService connectClusterService; - public ConnectorAsyncExecutor(ManagedClusterProperties managedClusterProperties) { - this.managedClusterProperties = managedClusterProperties; - } + private ConnectClusterService connectClusterService; /** * Run the connector synchronization. @@ -101,7 +99,6 @@ private Flux checkConnectClusterHealth() { }); } - /** * For each connect cluster, start the synchronization of connectors. */ @@ -141,7 +138,7 @@ private Flux synchronizeConnectCluster(String connectCluster) { managedClusterProperties.getName(), connectCluster); } else { log.error( - "Exception during connectors synchronization for Kafka cluster {} and Kafka Connect {}: {}.", + "Error during connectors synchronization for Kafka cluster {} and Kafka Connect {}: {}.", managedClusterProperties.getName(), connectCluster, error.getMessage()); } }) diff --git a/src/main/java/com/michelin/ns4kafka/services/executors/ConsumerGroupAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/services/executors/ConsumerGroupAsyncExecutor.java index 04b2e41f..372728e8 100644 --- a/src/main/java/com/michelin/ns4kafka/services/executors/ConsumerGroupAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/services/executors/ConsumerGroupAsyncExecutor.java @@ -10,6 +10,7 @@ import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ConsumerGroupDescription; @@ -23,13 +24,10 @@ @Slf4j @EachBean(ManagedClusterProperties.class) @Singleton +@AllArgsConstructor public class ConsumerGroupAsyncExecutor { private final ManagedClusterProperties managedClusterProperties; - public ConsumerGroupAsyncExecutor(ManagedClusterProperties managedClusterProperties) { - this.managedClusterProperties = managedClusterProperties; - } - /** * Getter for Kafka Admin client. * diff --git a/src/main/java/com/michelin/ns4kafka/services/executors/KafkaAsyncExecutorScheduler.java b/src/main/java/com/michelin/ns4kafka/services/executors/KafkaAsyncExecutorScheduler.java index c387d963..233e57d8 100644 --- a/src/main/java/com/michelin/ns4kafka/services/executors/KafkaAsyncExecutorScheduler.java +++ b/src/main/java/com/michelin/ns4kafka/services/executors/KafkaAsyncExecutorScheduler.java @@ -18,12 +18,16 @@ @Singleton public class KafkaAsyncExecutorScheduler { private final AtomicBoolean ready = new AtomicBoolean(false); + @Inject List topicAsyncExecutors; + @Inject List accessControlEntryAsyncExecutors; + @Inject List connectorAsyncExecutors; + @Inject List userAsyncExecutors; diff --git a/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java index c0afee25..3fc416b5 100644 --- a/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java @@ -5,22 +5,27 @@ import com.michelin.ns4kafka.properties.ManagedClusterProperties; import com.michelin.ns4kafka.repositories.TopicRepository; import com.michelin.ns4kafka.repositories.kafka.KafkaStoreException; +import com.michelin.ns4kafka.services.clients.schema.SchemaRegistryClient; +import com.michelin.ns4kafka.services.clients.schema.entities.TagTopicInfo; import io.micronaut.context.annotation.EachBean; -import jakarta.inject.Inject; import jakarta.inject.Singleton; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Date; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.stream.Collectors; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AlterConfigOp; @@ -41,15 +46,16 @@ @Slf4j @EachBean(ManagedClusterProperties.class) @Singleton +@AllArgsConstructor public class TopicAsyncExecutor { + public static final String CLUSTER_ID = "cluster.id"; + public static final String TOPIC_ENTITY_TYPE = "kafka_topic"; + private final ManagedClusterProperties managedClusterProperties; - @Inject - TopicRepository topicRepository; + private TopicRepository topicRepository; - public TopicAsyncExecutor(ManagedClusterProperties managedClusterProperties) { - this.managedClusterProperties = managedClusterProperties; - } + private SchemaRegistryClient schemaRegistryClient; private Admin getAdminClient() { return managedClusterProperties.getAdminClient(); @@ -74,15 +80,15 @@ public void synchronizeTopics() { Map brokerTopics = collectBrokerTopics(); List ns4kafkaTopics = topicRepository.findAllForCluster(managedClusterProperties.getName()); - List toCreate = ns4kafkaTopics.stream() + List createTopics = ns4kafkaTopics.stream() .filter(topic -> !brokerTopics.containsKey(topic.getMetadata().getName())) .toList(); - List toCheckConf = ns4kafkaTopics.stream() + List checkTopics = ns4kafkaTopics.stream() .filter(topic -> brokerTopics.containsKey(topic.getMetadata().getName())) .toList(); - Map> toUpdate = toCheckConf.stream() + Map> updateTopics = checkTopics.stream() .map(topic -> { Map actualConf = brokerTopics.get(topic.getMetadata().getName()).getSpec().getConfigs(); @@ -99,15 +105,15 @@ public void synchronizeTopics() { .filter(Objects::nonNull) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - if (!toCreate.isEmpty()) { + if (!createTopics.isEmpty()) { log.debug("Topic(s) to create: " - + String.join(",", toCreate.stream().map(topic -> topic.getMetadata().getName()).toList())); + + String.join(", ", createTopics.stream().map(topic -> topic.getMetadata().getName()).toList())); } - if (!toUpdate.isEmpty()) { + if (!updateTopics.isEmpty()) { log.debug("Topic(s) to update: " - + String.join(",", toUpdate.keySet().stream().map(ConfigResource::name).toList())); - for (Map.Entry> e : toUpdate.entrySet()) { + + String.join(", ", updateTopics.keySet().stream().map(ConfigResource::name).toList())); + for (Map.Entry> e : updateTopics.entrySet()) { for (AlterConfigOp op : e.getValue()) { log.debug( e.getKey().name() + " " + op.opType().toString() + " " + op.configEntry().name() + "(" @@ -116,25 +122,74 @@ public void synchronizeTopics() { } } - createTopics(toCreate); - alterTopics(toUpdate, toCheckConf); + createTopics(createTopics); + alterTopics(updateTopics, checkTopics); + + if (isConfluentCloud()) { + alterTags(checkTopics, brokerTopics); + } } catch (ExecutionException | TimeoutException | CancellationException | KafkaStoreException e) { - log.error("Error", e); + log.error("An error occurred during the topic synchronization", e); } catch (InterruptedException e) { - log.error("Error", e); + log.error("Thread interrupted during the topic synchronization", e); Thread.currentThread().interrupt(); } } + /** + * Alter tags. + * + * @param ns4kafkaTopics Topics from ns4kafka + * @param brokerTopics Topics from broker + */ + public void alterTags(List ns4kafkaTopics, Map brokerTopics) { + List tagsToCreate = ns4kafkaTopics + .stream() + .flatMap(topic -> { + Topic brokerTopic = brokerTopics.get(topic.getMetadata().getName()); + + // Get tags to delete + Set existingTags = new HashSet<>(brokerTopic.getSpec().getTags()); + existingTags.removeAll(Set.copyOf(topic.getSpec().getTags())); + deleteTags(existingTags, topic.getMetadata().getName()); + + // Get tags to create + Set newTags = new HashSet<>(topic.getSpec().getTags()); + newTags.removeAll(Set.copyOf(brokerTopic.getSpec().getTags())); + + return newTags + .stream() + .map(tag -> TagTopicInfo.builder() + .entityName(managedClusterProperties + .getConfig() + .getProperty(CLUSTER_ID) + ":" + topic.getMetadata().getName()) + .typeName(tag) + .entityType(TOPIC_ENTITY_TYPE) + .build()); + }) + .toList(); + + if (!tagsToCreate.isEmpty()) { + createTags(tagsToCreate); + } + } + /** * Delete a topic. * * @param topic The topic to delete */ public void deleteTopic(Topic topic) throws InterruptedException, ExecutionException, TimeoutException { - getAdminClient().deleteTopics(List.of(topic.getMetadata().getName())).all().get(30, TimeUnit.SECONDS); + getAdminClient().deleteTopics(List.of(topic.getMetadata().getName())) + .all() + .get(30, TimeUnit.SECONDS); + log.info("Success deleting topic {} on {}", topic.getMetadata().getName(), - this.managedClusterProperties.getName()); + managedClusterProperties.getName()); + + if (isConfluentCloud() && !topic.getSpec().getTags().isEmpty()) { + deleteTags(topic.getSpec().getTags(), topic.getMetadata().getName()); + } } /** @@ -159,6 +214,33 @@ public List listBrokerTopicNames() throws InterruptedException, Executio .toList(); } + /** + * Enrich topics with confluent tags. + * + * @param topics Topics to complete + */ + public void enrichWithTags(Map topics) { + if (isConfluentCloud()) { + topics.forEach((key, value) -> { + List tags = schemaRegistryClient.getTopicWithTags(managedClusterProperties.getName(), + managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + + ":" + value.getMetadata().getName()).block(); + + value.getSpec().setTags(tags != null ? tags.stream().map(TagTopicInfo::typeName).toList() : + Collections.emptyList()); + }); + } + } + + /** + * Check if the current cluster is Confluent Cloud. + * + * @return true if it is, false otherwise + */ + public boolean isConfluentCloud() { + return managedClusterProperties.getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); + } + /** * Collect all topics on broker from a list of topic names. * @@ -173,7 +255,7 @@ public Map collectBrokerTopicsFromNames(List topicNames) Map topicDescriptions = getAdminClient().describeTopics(topicNames) .allTopicNames().get(); - return getAdminClient() + Map topics = getAdminClient() .describeConfigs(topicNames.stream() .map(s -> new ConfigResource(ConfigResource.Type.TOPIC, s)) .toList()) @@ -204,13 +286,27 @@ public Map collectBrokerTopicsFromNames(List topicNames) .build() ) .collect(Collectors.toMap(topic -> topic.getMetadata().getName(), Function.identity())); + + enrichWithTags(topics); + + return topics; } + /** + * Alter topics. + * + * @param toUpdate The topics to update + * @param topics The current topics + */ private void alterTopics(Map> toUpdate, List topics) { AlterConfigsResult alterConfigsResult = getAdminClient().incrementalAlterConfigs(toUpdate); alterConfigsResult.values().forEach((key, value) -> { - Topic updatedTopic = - topics.stream().filter(t -> t.getMetadata().getName().equals(key.name())).findFirst().get(); + Topic updatedTopic = topics + .stream() + .filter(t -> t.getMetadata().getName().equals(key.name())) + .findFirst() + .get(); + try { value.get(10, TimeUnit.SECONDS); updatedTopic.getMetadata().setCreationTimestamp(Date.from(Instant.now())); @@ -220,7 +316,7 @@ private void alterTopics(Map> toUpdate log.info("Success updating topic configs {} on {}: [{}]", key.name(), managedClusterProperties.getName(), - toUpdate.get(key).stream().map(AlterConfigOp::toString).collect(Collectors.joining(","))); + toUpdate.get(key).stream().map(AlterConfigOp::toString).collect(Collectors.joining(", "))); } catch (InterruptedException e) { log.error("Error", e); Thread.currentThread().interrupt(); @@ -228,12 +324,17 @@ private void alterTopics(Map> toUpdate updatedTopic.setStatus( Topic.TopicStatus.ofFailed("Error while updating topic configs: " + e.getMessage())); log.error(String.format("Error while updating topic configs %s on %s", key.name(), - this.managedClusterProperties.getName()), e); + managedClusterProperties.getName()), e); } topicRepository.create(updatedTopic); }); } + /** + * Create topics. + * + * @param topics The topics to create + */ private void createTopics(List topics) { List newTopics = topics.stream() .map(topic -> { @@ -241,33 +342,81 @@ private void createTopics(List topics) { NewTopic newTopic = new NewTopic(topic.getMetadata().getName(), topic.getSpec().getPartitions(), (short) topic.getSpec().getReplicationFactor()); newTopic.configs(topic.getSpec().getConfigs()); - log.debug("{}", newTopic); return newTopic; }) .toList(); CreateTopicsResult createTopicsResult = getAdminClient().createTopics(newTopics); createTopicsResult.values().forEach((key, value) -> { - Topic createdTopic = topics.stream().filter(t -> t.getMetadata().getName().equals(key)).findFirst().get(); + Topic createdTopic = topics + .stream() + .filter(t -> t.getMetadata().getName().equals(key)) + .findFirst() + .get(); + try { value.get(10, TimeUnit.SECONDS); createdTopic.getMetadata().setCreationTimestamp(Date.from(Instant.now())); createdTopic.getMetadata().setGeneration(1); createdTopic.setStatus(Topic.TopicStatus.ofSuccess("Topic created")); - log.info("Success creating topic {} on {}", key, this.managedClusterProperties.getName()); + log.info("Success creating topic {} on {}", key, managedClusterProperties.getName()); } catch (InterruptedException e) { log.error("Error", e); Thread.currentThread().interrupt(); } catch (Exception e) { createdTopic.setStatus(Topic.TopicStatus.ofFailed("Error while creating topic: " + e.getMessage())); log.error( - String.format("Error while creating topic %s on %s", key, this.managedClusterProperties.getName()), + String.format("Error while creating topic %s on %s", key, managedClusterProperties.getName()), e); } topicRepository.create(createdTopic); }); } + /** + * Create tags. + * + * @param tagsToCreate The tags to create + */ + private void createTags(List tagsToCreate) { + String stringTags = String.join(", ", tagsToCreate + .stream() + .map(Record::toString) + .toList()); + + schemaRegistryClient.addTags(managedClusterProperties.getName(), tagsToCreate) + .subscribe(success -> log.info(String.format("Success creating tag %s.", stringTags)), + error -> log.error(String.format("Error creating tag %s.", stringTags), error)); + } + + /** + * Delete tags. + * + * @param tagsToDelete The tags to delete + * @param topicName The topic name + */ + private void deleteTags(Collection tagsToDelete, String topicName) { + tagsToDelete + .forEach(tag -> schemaRegistryClient.deleteTag(managedClusterProperties.getName(), + managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + + ":" + topicName, tag) + .subscribe(success -> log.info(String.format("Success deleting tag %s.", + managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + + topicName + + "/" + tag)), + error -> log.error(String.format("Error deleting tag %s.", + managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + + topicName + + "/" + tag), error))); + } + + /** + * Compute the configuration changes. + * + * @param expected The config from Ns4Kafka + * @param actual The config from cluster + * @return A list of config + */ private Collection computeConfigChanges(Map expected, Map actual) { List toCreate = expected.entrySet() .stream() diff --git a/src/main/java/com/michelin/ns4kafka/services/executors/UserAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/services/executors/UserAsyncExecutor.java index 119c736e..2b3f6296 100644 --- a/src/main/java/com/michelin/ns4kafka/services/executors/UserAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/services/executors/UserAsyncExecutor.java @@ -137,7 +137,7 @@ private Map> collectNs4kafkaQuotas() { .stream() .filter(q -> q.getKey().startsWith(USER_QUOTA_PREFIX)) .forEach(q -> userQuota.put( - q.getKey().replaceAll(USER_QUOTA_PREFIX, ""), + q.getKey().replace(USER_QUOTA_PREFIX, ""), Double.parseDouble(q.getValue())))); return Map.entry(namespace.getSpec().getKafkaUser(), userQuota); @@ -165,7 +165,7 @@ static class Scram512UserSynchronizer implements AbstractUserSynchronizer { private final ScramCredentialInfo info = new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 4096); private final SecureRandom secureRandom = new SecureRandom(); - private Admin admin; + private final Admin admin; public Scram512UserSynchronizer(Admin admin) { this.admin = admin; @@ -273,4 +273,4 @@ public Map> listQuotas() { throw exception; } } -} \ No newline at end of file +} diff --git a/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java b/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java index 23603594..3854ce35 100644 --- a/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java @@ -29,6 +29,7 @@ import io.micronaut.http.HttpResponse; import io.micronaut.http.HttpStatus; import io.micronaut.security.utils.SecurityService; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -351,6 +352,116 @@ void updateTopic() throws InterruptedException, ExecutionException, TimeoutExcep assertEquals("test.topic", actual.getMetadata().getName()); } + @Test + void shouldValidateNewTags() { + Namespace ns = Namespace.builder() + .metadata(ObjectMeta.builder() + .name("test") + .cluster("local") + .build()) + .spec(NamespaceSpec.builder() + .topicValidator(TopicValidator.makeDefault()) + .build()) + .build(); + + Topic existing = Topic.builder() + .metadata(ObjectMeta.builder() + .name("test.topic") + .build()) + .spec(Topic.TopicSpec.builder() + .replicationFactor(3) + .partitions(3) + .tags(Arrays.asList("TAG1", "TAG3")) + .configs(Map.of("cleanup.policy", "compact", + "min.insync.replicas", "2", + "retention.ms", "60000")) + .build()) + .build(); + + Topic topic = Topic.builder() + .metadata(ObjectMeta.builder() + .name("test.topic") + .build()) + .spec(Topic.TopicSpec.builder() + .tags(Arrays.asList("TAG1", "TAG2")) + .replicationFactor(3) + .partitions(3) + .configs(Map.of("cleanup.policy", "delete", + "min.insync.replicas", "2", + "retention.ms", "60000")) + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + when(topicService.findByName(ns, "test.topic")).thenReturn(Optional.of(existing)); + when(topicService.validateTags(ns, topic)).thenReturn(List.of("Error on tags")); + + ResourceValidationException actual = + assertThrows(ResourceValidationException.class, () -> topicController.apply("test", topic, false)); + assertEquals(1, actual.getValidationErrors().size()); + assertLinesMatch(List.of("Error on tags"), actual.getValidationErrors()); + } + + @Test + void shouldNotValidateTagsWhenNoNewTag() throws InterruptedException, ExecutionException, TimeoutException { + Namespace ns = Namespace.builder() + .metadata(ObjectMeta.builder() + .name("test") + .cluster("local") + .build()) + .spec(NamespaceSpec.builder() + .topicValidator(TopicValidator.makeDefault()) + .build()) + .build(); + + Topic existing = Topic.builder() + .metadata(ObjectMeta.builder() + .name("test.topic") + .build()) + .spec(Topic.TopicSpec.builder() + .tags(Arrays.asList("TAG1", "TAG2")) + .replicationFactor(3) + .partitions(3) + .configs(Map.of("cleanup.policy", "compact", + "min.insync.replicas", "2", + "retention.ms", "60000")) + .build()) + .build(); + + Topic topic = Topic.builder() + .metadata(ObjectMeta.builder() + .name("test.topic") + .build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of("TAG1")) + .replicationFactor(3) + .partitions(3) + .configs(Map.of("cleanup.policy", "delete", + "min.insync.replicas", "2", + "retention.ms", "60000")) + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + when(topicService.findByName(ns, "test.topic")).thenReturn(Optional.of(existing)); + when(topicService.create(topic)).thenReturn(topic); + when(securityService.username()).thenReturn(Optional.of("test-user")); + when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); + doNothing().when(applicationEventPublisher).publishEvent(any()); + + var response = topicController.apply("test", topic, false); + Topic actual = response.body(); + assertEquals("changed", response.header("X-Ns4kafka-Result")); + assertEquals("test.topic", actual.getMetadata().getName()); + assertEquals(1, actual.getSpec().getTags().size()); + assertEquals("TAG1", actual.getSpec().getTags().get(0)); + } + + /** + * Validate topic update when there are validations errors. + */ @Test void updateTopicValidationErrors() { Namespace ns = Namespace.builder() diff --git a/src/test/java/com/michelin/ns4kafka/integration/UserTest.java b/src/test/java/com/michelin/ns4kafka/integration/UserTest.java index 6ced99fa..7022e4e7 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/UserTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/UserTest.java @@ -115,9 +115,7 @@ void init() { client.toBlocking() .exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces").bearerAuth(token).body(ns3)); - //force User Sync userAsyncExecutors.forEach(UserAsyncExecutor::run); - } @Test diff --git a/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java b/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java index a49a3d9e..fa277b11 100644 --- a/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java @@ -13,6 +13,8 @@ import com.michelin.ns4kafka.models.Topic; import com.michelin.ns4kafka.properties.ManagedClusterProperties; import com.michelin.ns4kafka.repositories.TopicRepository; +import com.michelin.ns4kafka.services.clients.schema.SchemaRegistryClient; +import com.michelin.ns4kafka.services.clients.schema.entities.TagInfo; import com.michelin.ns4kafka.services.executors.TopicAsyncExecutor; import io.micronaut.context.ApplicationContext; import io.micronaut.inject.qualifiers.Qualifiers; @@ -31,6 +33,7 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Mono; @ExtendWith(MockitoExtension.class) class TopicServiceTest { @@ -49,6 +52,12 @@ class TopicServiceTest { @Mock List managedClusterProperties; + @Mock + SchemaRegistryClient schemaRegistryClient; + + /** + * Validate find topic by name. + */ @Test void findByName() { Namespace ns = Namespace.builder() @@ -874,4 +883,106 @@ void findAll() { List topics = topicService.findAll(); assertEquals(4, topics.size()); } + + @Test + void shouldTagsBeValid() { + Namespace ns = Namespace.builder() + .metadata(ObjectMeta.builder() + .name("namespace") + .cluster("local") + .build()) + .build(); + + Topic topic = Topic.builder() + .metadata(ObjectMeta.builder().name("ns-topic1").build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of("TAG_TEST")).build()) + .build(); + + List tagInfo = List.of(TagInfo.builder().name("TAG_TEST").build()); + + when(managedClusterProperties.stream()).thenReturn(Stream.of( + new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD))); + when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(tagInfo)); + + List validationErrors = topicService.validateTags(ns, topic); + assertEquals(0, validationErrors.size()); + } + + @Test + void shouldTagsBeInvalidWhenNotConfluentCloud() { + Namespace ns = Namespace.builder() + .metadata(ObjectMeta.builder() + .name("namespace") + .cluster("local") + .build()) + .build(); + + Topic topic = Topic.builder() + .metadata(ObjectMeta.builder().name("ns-topic1").build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of("TAG_TEST")).build()) + .build(); + + when(managedClusterProperties.stream()).thenReturn(Stream.of( + new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.SELF_MANAGED))); + + List validationErrors = topicService.validateTags(ns, topic); + assertEquals(1, validationErrors.size()); + assertEquals("Invalid value TAG_TEST for tags: Tags are not currently supported.", validationErrors.get(0)); + } + + @Test + void shouldTagsBeInvalidWhenNoTagsAllowed() { + Namespace ns = Namespace.builder() + .metadata(ObjectMeta.builder() + .name("namespace") + .cluster("local") + .build()) + .build(); + + Topic topic = Topic.builder() + .metadata(ObjectMeta.builder().name("ns-topic1").build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of("TAG_TEST")).build()) + .build(); + + when(managedClusterProperties.stream()).thenReturn(Stream.of( + new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD))); + when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(Collections.emptyList())); + + List validationErrors = topicService.validateTags(ns, topic); + assertEquals(1, validationErrors.size()); + assertEquals( + "Invalid value TAG_TEST for tags: No tags allowed.", + validationErrors.get(0)); + } + + @Test + void shouldTagsBeInvalidWhenNotAllowed() { + Namespace ns = Namespace.builder() + .metadata(ObjectMeta.builder() + .name("namespace") + .cluster("local") + .build()) + .build(); + + Topic topic = Topic.builder() + .metadata(ObjectMeta.builder().name("ns-topic1").build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of("BAD_TAG", "TAG_TEST")).build()) + .build(); + + List tagInfo = List.of(TagInfo.builder().name("TAG_TEST").build()); + + when(managedClusterProperties.stream()) + .thenReturn(Stream.of( + new ManagedClusterProperties("local", + ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD))); + when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(tagInfo)); + + List validationErrors = topicService.validateTags(ns, topic); + assertEquals(1, validationErrors.size()); + assertEquals("Invalid value BAD_TAG for tags: Available tags are TAG_TEST.", validationErrors.get(0)); + } } diff --git a/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java b/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java new file mode 100644 index 00000000..0f4e1686 --- /dev/null +++ b/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java @@ -0,0 +1,289 @@ +package com.michelin.ns4kafka.services.executors; + +import static com.michelin.ns4kafka.services.executors.TopicAsyncExecutor.CLUSTER_ID; +import static com.michelin.ns4kafka.services.executors.TopicAsyncExecutor.TOPIC_ENTITY_TYPE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.michelin.ns4kafka.models.ObjectMeta; +import com.michelin.ns4kafka.models.Topic; +import com.michelin.ns4kafka.properties.ManagedClusterProperties; +import com.michelin.ns4kafka.services.clients.schema.SchemaRegistryClient; +import com.michelin.ns4kafka.services.clients.schema.entities.TagTopicInfo; +import io.micronaut.http.HttpResponse; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DeleteTopicsResult; +import org.apache.kafka.common.KafkaFuture; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Mono; + +@ExtendWith(MockitoExtension.class) +class TopicAsyncExecutorTest { + private static final String CLUSTER_ID_TEST = "cluster_id_test"; + private static final String LOCAL_CLUSTER = "local"; + private static final String TOPIC_NAME = "topic"; + private static final String TAG1 = "TAG1"; + private static final String TAG2 = "TAG2"; + private static final String TAG3 = "TAG3"; + + @Mock + SchemaRegistryClient schemaRegistryClient; + + @Mock + ManagedClusterProperties managedClusterProperties; + + @Mock + Admin adminClient; + + @Mock + DeleteTopicsResult deleteTopicsResult; + + @Mock + KafkaFuture kafkaFuture; + + @InjectMocks + TopicAsyncExecutor topicAsyncExecutor; + + @Test + void shouldDeleteTagsAndNotCreateIfEmpty() { + Properties properties = new Properties(); + properties.put(CLUSTER_ID, CLUSTER_ID_TEST); + + when(schemaRegistryClient.deleteTag(anyString(), + anyString(), anyString())) + .thenReturn(Mono.empty()) + .thenReturn(Mono.error(new Exception("error"))); + when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); + when(managedClusterProperties.getConfig()).thenReturn(properties); + + List ns4kafkaTopics = List.of( + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of(TAG1)) + .build()) + .build()); + + Map brokerTopics = Map.of(TOPIC_NAME, + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of(TAG1, TAG2, TAG3)) + .build()) + .build()); + + topicAsyncExecutor.alterTags(ns4kafkaTopics, brokerTopics); + + verify(schemaRegistryClient).deleteTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG2); + verify(schemaRegistryClient).deleteTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG3); + } + + @Test + void shouldCreateTags() { + Properties properties = new Properties(); + properties.put(CLUSTER_ID, CLUSTER_ID_TEST); + + when(schemaRegistryClient.addTags(anyString(), anyList())) + .thenReturn(Mono.empty()) + .thenReturn(Mono.error(new Exception("error"))); + when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); + when(managedClusterProperties.getConfig()).thenReturn(properties); + + List ns4kafkaTopics = List.of( + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of(TAG1)) + .build()) + .build()); + + Map brokerTopics = Map.of(TOPIC_NAME, + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build()); + + topicAsyncExecutor.alterTags(ns4kafkaTopics, brokerTopics); + + verify(schemaRegistryClient).addTags(eq(LOCAL_CLUSTER), argThat(tags -> + tags.get(0).entityName().equals(CLUSTER_ID_TEST + ":" + TOPIC_NAME) + && tags.get(0).typeName().equals(TAG1) + && tags.get(0).entityType().equals(TOPIC_ENTITY_TYPE))); + } + + @Test + void shouldBeConfluentCloud() { + when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); + assertTrue(topicAsyncExecutor.isConfluentCloud()); + } + + @Test + void shouldDeleteTopicNoTags() throws ExecutionException, InterruptedException, TimeoutException { + when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); + when(deleteTopicsResult.all()).thenReturn(kafkaFuture); + when(adminClient.deleteTopics(anyList())).thenReturn(deleteTopicsResult); + when(managedClusterProperties.getAdminClient()).thenReturn(adminClient); + + Topic topic = Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build(); + + topicAsyncExecutor.deleteTopic(topic); + + verify(schemaRegistryClient, never()).deleteTag(any(), any(), any()); + } + + @Test + void shouldDeleteTopicSelfManagedCluster() throws ExecutionException, InterruptedException, TimeoutException { + when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.SELF_MANAGED); + when(deleteTopicsResult.all()).thenReturn(kafkaFuture); + when(adminClient.deleteTopics(anyList())).thenReturn(deleteTopicsResult); + when(managedClusterProperties.getAdminClient()).thenReturn(adminClient); + + Topic topic = Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build(); + + topicAsyncExecutor.deleteTopic(topic); + + verify(schemaRegistryClient, never()).deleteTag(any(), any(), any()); + } + + @Test + void shouldDeleteTopicAndTags() throws ExecutionException, InterruptedException, TimeoutException { + Properties properties = new Properties(); + properties.put(CLUSTER_ID, CLUSTER_ID_TEST); + + when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); + when(deleteTopicsResult.all()).thenReturn(kafkaFuture); + when(adminClient.deleteTopics(anyList())).thenReturn(deleteTopicsResult); + when(managedClusterProperties.getAdminClient()).thenReturn(adminClient); + when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); + when(managedClusterProperties.getConfig()).thenReturn(properties); + when(schemaRegistryClient.deleteTag(anyString(), + anyString(), anyString())) + .thenReturn(Mono.just(HttpResponse.ok())) + .thenReturn(Mono.error(new Exception("error"))); + + Topic topic = Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of(TAG1)) + .build()) + .build(); + + topicAsyncExecutor.deleteTopic(topic); + + verify(schemaRegistryClient).deleteTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG1); + } + + @Test + void shouldNotEnrichWithTagsWhenNotConfluentCloud() { + when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.SELF_MANAGED); + + Map brokerTopics = Map.of(TOPIC_NAME, + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build()); + + topicAsyncExecutor.enrichWithTags(brokerTopics); + + assertTrue(brokerTopics.get(TOPIC_NAME).getSpec().getTags().isEmpty()); + } + + @Test + void shouldEnrichWithTagsWhenConfluentCloud() { + Properties properties = new Properties(); + properties.put(CLUSTER_ID, CLUSTER_ID_TEST); + + when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); + when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); + when(managedClusterProperties.getConfig()).thenReturn(properties); + + TagTopicInfo tagTopicInfo = TagTopicInfo.builder() + .typeName("typeName") + .build(); + + when(schemaRegistryClient.getTopicWithTags(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME)) + .thenReturn(Mono.just(List.of(tagTopicInfo))); + + Map brokerTopics = Map.of(TOPIC_NAME, + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build()); + + topicAsyncExecutor.enrichWithTags(brokerTopics); + + assertEquals("typeName", brokerTopics.get(TOPIC_NAME).getSpec().getTags().get(0)); + } + + @Test + void shouldEnrichWithTagsWhenConfluentCloudAndResponseIsNull() { + Properties properties = new Properties(); + properties.put(CLUSTER_ID, CLUSTER_ID_TEST); + + when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); + when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); + when(managedClusterProperties.getConfig()).thenReturn(properties); + + when(schemaRegistryClient.getTopicWithTags(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME)) + .thenReturn(Mono.empty()); + + Map brokerTopics = Map.of(TOPIC_NAME, + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build()); + + topicAsyncExecutor.enrichWithTags(brokerTopics); + + assertTrue(brokerTopics.get(TOPIC_NAME).getSpec().getTags().isEmpty()); + } +}