Skip to content

Commit

Permalink
Manage Confluent tags
Browse files Browse the repository at this point in the history
  • Loading branch information
adriencalime committed Oct 3, 2023
1 parent 2767d2e commit ea6c41e
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
import jakarta.inject.Inject;
import jakarta.validation.Valid;
import java.time.Instant;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -104,8 +109,10 @@ public HttpResponse<Topic> apply(String namespace, @Valid @Body Topic topic,
validationErrors.addAll(topicService.validateTopicUpdate(ns, existingTopic.get(), topic));
}

List<String> existingTags = existingTopic.isPresent() && existingTopic.get().getSpec().getTags() != null ? existingTopic.get().getSpec().getTags() : Collections.emptyList();
if(topic.getSpec().getTags().stream().anyMatch(newTag -> !existingTags.contains(newTag))) {
List<String> existingTags = existingTopic.isPresent() && existingTopic.get().getSpec().getTags() != null
? existingTopic.get().getSpec().getTags()
: Collections.emptyList();
if (topic.getSpec().getTags().stream().anyMatch(newTag -> !existingTags.contains(newTag))) {
validationErrors.addAll(topicService.validateTags(ns, topic));
}

Expand Down
29 changes: 20 additions & 9 deletions src/main/java/com/michelin/ns4kafka/services/TopicService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -322,7 +326,8 @@ public Map<TopicPartition, Long> deleteRecords(Topic topic, Map<TopicPartition,
}

/**
* Validate tags for topic
* Validate tags for topic.
*
* @param namespace The namespace
* @param topic The topic which contains tags
* @return A list of validation errors
Expand All @@ -335,21 +340,27 @@ public List<String> validateTags(Namespace namespace, Topic topic) {
.filter(cluster -> namespace.getMetadata().getCluster().equals(cluster.getName()))
.findFirst();

if(topicCluster.isPresent() && !topicCluster.get().getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)) {
validationErrors.add(String.format("Invalid value (%s) for tags: Tags are not currently supported.", String.join(",", topic.getSpec().getTags())));
if (topicCluster.isPresent()
&& !topicCluster.get().getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)) {
validationErrors.add(String.format(
"Invalid value (%s) for tags: Tags are not currently supported.",
String.join(",", topic.getSpec().getTags())));
return validationErrors;
}

Set<String> tagNames = schemaRegistryClient.getTags(namespace.getMetadata().getCluster())
.map(tags -> tags.stream().map(TagInfo::name).collect(Collectors.toSet())).block();

if(tagNames == null || tagNames.isEmpty()) {
validationErrors.add(String.format("Invalid value (%s) for tags: No tags allowed.", String.join(",", topic.getSpec().getTags())));
if (tagNames == null || tagNames.isEmpty()) {
validationErrors.add(String.format(
"Invalid value (%s) for tags: No tags allowed.",
String.join(",", topic.getSpec().getTags())));
return validationErrors;
}

if(!tagNames.containsAll(topic.getSpec().getTags())) {
validationErrors.add(String.format("Invalid value (%s) for tags: Available tags are (%s).",
if (!tagNames.containsAll(topic.getSpec().getTags())) {
validationErrors.add(String.format(
"Invalid value (%s) for tags: Available tags are (%s).",
String.join(",", topic.getSpec().getTags()), String.join(",", tagNames)));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
package com.michelin.ns4kafka.services.clients.schema;

import com.michelin.ns4kafka.properties.ManagedClusterProperties;
import com.michelin.ns4kafka.services.clients.schema.entities.*;
import com.michelin.ns4kafka.services.clients.schema.entities.SchemaCompatibilityCheckResponse;
import com.michelin.ns4kafka.services.clients.schema.entities.SchemaCompatibilityRequest;
import com.michelin.ns4kafka.services.clients.schema.entities.SchemaCompatibilityResponse;
import com.michelin.ns4kafka.services.clients.schema.entities.SchemaRequest;
import com.michelin.ns4kafka.services.clients.schema.entities.SchemaResponse;
import com.michelin.ns4kafka.services.clients.schema.entities.TagInfo;
import com.michelin.ns4kafka.services.clients.schema.entities.TagSpecs;
import com.michelin.ns4kafka.services.clients.schema.entities.TagTopicInfo;
import com.michelin.ns4kafka.utils.exceptions.ResourceValidationException;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.StringUtils;
Expand Down Expand Up @@ -170,59 +177,80 @@ public Mono<SchemaCompatibilityResponse> deleteCurrentCompatibilityBySubject(Str
}

/**
* List tags
* List tags.
*
* @param kafkaCluster The Kafka cluster
* @return A list of tags
*/
public Mono<List<TagInfo>> getTags(String kafkaCluster) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/types/tagdefs")))
HttpRequest<?> request = HttpRequest
.GET(
URI.create(StringUtils.prependUri(
config.getUrl(),
"/catalog/v1/types/tagdefs")))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class)));
}

/**
* List tags of a topic
* List tags of a topic.
*
* @param kafkaCluster The Kafka cluster
* @param entityName The topic's name for the API
* @return A list of tags
*/
public Mono<List<TagTopicInfo>> getTopicWithTags(String kafkaCluster, String entityName) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/entity/type/kafka_topic/name/" + entityName + "/tags")))
HttpRequest<?> request = HttpRequest
.GET(
URI.create(StringUtils.prependUri(
config.getUrl(),
"/catalog/v1/entity/type/kafka_topic/name/" + entityName + "/tags")))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class)));
}

/**
* Add a tag to a topic
* Add a tag to a topic.
*
* @param kafkaCluster The Kafka cluster
* @param tagSpecs Tags to add
* @return Information about added tags
*/
public Mono<List<TagTopicInfo>> addTags(String kafkaCluster, List<TagSpecs> tagSpecs) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.POST(URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/entity/tags")), tagSpecs)
HttpRequest<?> request = HttpRequest
.POST(
URI.create(StringUtils.prependUri(
config.getUrl(),
"/catalog/v1/entity/tags")), tagSpecs)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class)));
}

/**
* Delete a tag to a topic
* Delete a tag to a topic.
*
* @param kafkaCluster The Kafka cluster
* @param entityName The topic's name
* @param tagName The tag to delete
* @return The resume response
*/
public Mono<HttpResponse<Void>> deleteTag(String kafkaCluster, String entityName, String tagName) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.DELETE(URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/entity/type/kafka_topic/name/" + entityName + "/tags/" + tagName)))
HttpRequest<?> request = HttpRequest
.DELETE(
URI.create(StringUtils.prependUri(
config.getUrl(),
"/catalog/v1/entity/type/kafka_topic/name/" + entityName + "/tags/" + tagName)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.exchange(request, Void.class));
}

/**
* Get the schema registry of the given Kafka cluster
* Get the schema registry of the given Kafka cluster.
*
* @param kafkaCluster The Kafka cluster
* @return The schema registry configuration
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

import lombok.Builder;

/**
* Tag name.
*
* @param name Tag name
*/
@Builder
public record TagInfo(String name) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@

import lombok.Builder;

/**
* Tag Specs to call schema registry API.
*
* @param entityName The entity name
* @param entityType The entity type
* @param typeName The type name
*/
@Builder
public record TagSpecs(String entityName, String entityType, String typeName) {

@Override
public String toString() {
return entityName + "/" + typeName;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

import lombok.Builder;

/**
* Information on tag.
*
* @param entityName The entity name
* @param entityType The entity type
* @param typeName The type name
* @param entityStatus The entity status
*/
@Builder
public record TagTopicInfo(String entityName, String entityType, String typeName, String entityStatus) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,25 @@
import io.micronaut.context.annotation.EachBean;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import java.time.Instant;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
Expand All @@ -33,6 +40,8 @@
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;

/**
* Topic executor.
Expand Down Expand Up @@ -139,19 +148,27 @@ public void synchronizeTopics() {
}

/**
* Create tags
* Create tags.
*
* @param ns4kafkaTopics Topics from ns4kafka
* @param brokerTopics Topics from broker
*/
public void createTags(List<Topic> ns4kafkaTopics, Map<String, Topic> brokerTopics) {
List<TagSpecs> tagsToCreate = ns4kafkaTopics.stream().flatMap(ns4kafkaTopic -> {
Topic brokerTopic = brokerTopics.get(ns4kafkaTopic.getMetadata().getName());

List<String> existingTags = brokerTopic != null && brokerTopic.getSpec().getTags() != null ? brokerTopic.getSpec().getTags() : Collections.emptyList();
List<String> newTags = ns4kafkaTopic.getSpec().getTags() != null ? ns4kafkaTopic.getSpec().getTags() : Collections.emptyList();
List<String> existingTags = brokerTopic != null && brokerTopic.getSpec().getTags() != null
? brokerTopic.getSpec().getTags()
: Collections.emptyList();
List<String> newTags = ns4kafkaTopic.getSpec().getTags() != null
? ns4kafkaTopic.getSpec().getTags()
: Collections.emptyList();

return newTags.stream().filter(tag -> !existingTags.contains(tag)).map(tag -> TagSpecs.builder()
.entityName(managedClusterProperties.getConfig().getProperty(CLUSTER_ID)+":"+ns4kafkaTopic.getMetadata().getName())
.entityName(
managedClusterProperties.getConfig().getProperty(CLUSTER_ID)
+ ":"
+ ns4kafkaTopic.getMetadata().getName())
.typeName(tag)
.entityType(TOPIC_ENTITY_TYPE)
.build());
Expand All @@ -169,7 +186,8 @@ public void createTags(List<Topic> ns4kafkaTopics, Map<String, Topic> brokerTopi
}

/**
* Delete tags
* Delete tags.
*
* @param ns4kafkaTopics Topics from ns4kafka
* @param brokerTopics Topics from broker
*/
Expand All @@ -180,26 +198,38 @@ public void deleteTags(List<Topic> ns4kafkaTopics, Map<String, Topic> brokerTopi
.flatMap(brokerTopic -> {
Optional<Topic> newTopic = ns4kafkaTopics
.stream()
.filter(ns4kafkaTopic -> ns4kafkaTopic.getMetadata().getName().equals(brokerTopic.getMetadata().getName()))
.filter(ns4kafkaTopic -> ns4kafkaTopic
.getMetadata()
.getName()
.equals(brokerTopic.getMetadata().getName()))
.findFirst();

Set<String> existingTags = new HashSet<>(brokerTopic.getSpec().getTags());
Set<String> newTags = newTopic.isPresent() ? new HashSet<>(newTopic.get().getSpec().getTags()) : Collections.emptySet();
Set<String> newTags = newTopic.isPresent()
? new HashSet<>(newTopic.get().getSpec().getTags())
: Collections.emptySet();
existingTags.removeAll(newTags);
return existingTags
.stream()
.map(tag -> TagTopicInfo.builder()
.entityName(managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + brokerTopic.getMetadata().getName())
.entityName(managedClusterProperties
.getConfig()
.getProperty(CLUSTER_ID) + ":" + brokerTopic.getMetadata().getName())
.typeName(tag)
.entityType(TOPIC_ENTITY_TYPE)
.build());
}).toList();

tagsToDelete.forEach(tag -> schemaRegistryClient.deleteTag(managedClusterProperties.getName(), tag.entityName(), tag.typeName()).block());
tagsToDelete.forEach(tag ->
schemaRegistryClient.deleteTag(
managedClusterProperties.getName(),
tag.entityName(),
tag.typeName()).block());
}

/**
* Delete a topic
* Delete a topic.
*
* @param topic The topic to delete
*/
public void deleteTopic(Topic topic) throws InterruptedException, ExecutionException, TimeoutException {
Expand Down Expand Up @@ -231,14 +261,18 @@ public List<String> listBrokerTopicNames() throws InterruptedException, Executio
}

/**
* Enrich topics with confluent tags
* Enrich topics with confluent tags.
*
* @param topics Topics to complete
*/
public void enrichWithTags(Map<String, Topic> topics) {
if(managedClusterProperties.getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)) {
topics.forEach((key,value) ->
if (managedClusterProperties.getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)) {
topics.forEach((key, value) ->
value.getSpec().setTags(schemaRegistryClient.getTopicWithTags(managedClusterProperties.getName(),
managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + value.getMetadata().getName())
managedClusterProperties
.getConfig()
.getProperty(CLUSTER_ID)
+ ":" + value.getMetadata().getName())
.block().stream().map(TagTopicInfo::typeName).toList()));
}
}
Expand Down
Loading

0 comments on commit ea6c41e

Please sign in to comment.