Skip to content

Commit

Permalink
Manage Confluent tags
Browse files Browse the repository at this point in the history
  • Loading branch information
adriencalime committed Oct 3, 2023
1 parent 59c532c commit f2a2a30
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@
import jakarta.inject.Inject;
import jakarta.validation.Valid;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand Down
11 changes: 4 additions & 7 deletions src/main/java/com/michelin/ns4kafka/services/TopicService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -333,12 +330,12 @@ public Map<TopicPartition, Long> deleteRecords(Topic topic, Map<TopicPartition,
public List<String> validateTags(Namespace namespace, Topic topic) {
List<String> validationErrors = new ArrayList<>();

Optional<KafkaAsyncExecutorConfig> topicCluster = kafkaAsyncExecutorConfig
Optional<ManagedClusterProperties> topicCluster = managedClusterProperties
.stream()
.filter(cluster -> namespace.getMetadata().getCluster().equals(cluster.getName()))
.findFirst();

if(topicCluster.isPresent() && !topicCluster.get().getProvider().equals(KafkaAsyncExecutorConfig.KafkaProvider.CONFLUENT_CLOUD)) {
if(topicCluster.isPresent() && !topicCluster.get().getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)) {
validationErrors.add(String.format("Invalid value (%s) for tags: Tags are not currently supported.", String.join(",", topic.getSpec().getTags())));
return validationErrors;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package com.michelin.ns4kafka.services.clients.schema;

import com.michelin.ns4kafka.properties.ManagedClusterProperties;
import com.michelin.ns4kafka.services.clients.schema.entities.SchemaCompatibilityCheckResponse;
import com.michelin.ns4kafka.services.clients.schema.entities.SchemaCompatibilityRequest;
import com.michelin.ns4kafka.services.clients.schema.entities.SchemaCompatibilityResponse;
import com.michelin.ns4kafka.services.clients.schema.entities.SchemaRequest;
import com.michelin.ns4kafka.services.clients.schema.entities.SchemaResponse;
import com.michelin.ns4kafka.services.clients.schema.entities.*;
import com.michelin.ns4kafka.utils.exceptions.ResourceValidationException;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.StringUtils;
Expand Down Expand Up @@ -179,7 +175,7 @@ public Mono<SchemaCompatibilityResponse> deleteCurrentCompatibilityBySubject(Str
* @return A list of tags
*/
public Mono<List<TagInfo>> getTags(String kafkaCluster) {
KafkaAsyncExecutorConfig.RegistryConfig config = getSchemaRegistry(kafkaCluster);
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/types/tagdefs")))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class)));
Expand All @@ -192,7 +188,7 @@ public Mono<List<TagInfo>> getTags(String kafkaCluster) {
* @return A list of tags
*/
public Mono<List<TagTopicInfo>> getTopicWithTags(String kafkaCluster, String entityName) {
KafkaAsyncExecutorConfig.RegistryConfig config = getSchemaRegistry(kafkaCluster);
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/entity/type/kafka_topic/name/" + entityName + "/tags")))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class)));
Expand All @@ -205,7 +201,7 @@ public Mono<List<TagTopicInfo>> getTopicWithTags(String kafkaCluster, String ent
* @return Information about added tags
*/
public Mono<List<TagTopicInfo>> addTags(String kafkaCluster, List<TagSpecs> tagSpecs) {
KafkaAsyncExecutorConfig.RegistryConfig config = getSchemaRegistry(kafkaCluster);
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.POST(URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/entity/tags")), tagSpecs)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class)));
Expand All @@ -219,7 +215,7 @@ public Mono<List<TagTopicInfo>> addTags(String kafkaCluster, List<TagSpecs> tagS
* @return The resume response
*/
public Mono<HttpResponse<Void>> deleteTag(String kafkaCluster, String entityName, String tagName) {
KafkaAsyncExecutorConfig.RegistryConfig config = getSchemaRegistry(kafkaCluster);
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.DELETE(URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/entity/type/kafka_topic/name/" + entityName + "/tags/" + tagName)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.exchange(request, Void.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -56,8 +51,10 @@
public class TopicAsyncExecutor {
private final ManagedClusterProperties managedClusterProperties;

@Inject
TopicRepository topicRepository;

@Inject
SchemaRegistryClient schemaRegistryClient;

public TopicAsyncExecutor(ManagedClusterProperties managedClusterProperties) {
Expand Down Expand Up @@ -156,14 +153,14 @@ public void createTags(List<Topic> ns4kafkaTopics, Map<String, Topic> brokerTopi
List<String> newTags = ns4kafkaTopic.getSpec().getTags() != null ? ns4kafkaTopic.getSpec().getTags() : Collections.emptyList();

return newTags.stream().filter(tag -> !existingTags.contains(tag)).map(tag -> TagSpecs.builder()
.entityName(kafkaAsyncExecutorConfig.getConfig().getProperty(CLUSTER_ID)+":"+ns4kafkaTopic.getMetadata().getName())
.entityName(managedClusterProperties.getConfig().getProperty(CLUSTER_ID)+":"+ns4kafkaTopic.getMetadata().getName())
.typeName(tag)
.entityType(TOPIC_ENTITY_TYPE)
.build());
}).toList();

if(!tagsToCreate.isEmpty()) {
schemaRegistryClient.addTags(kafkaAsyncExecutorConfig.getName(), tagsToCreate).block();
schemaRegistryClient.addTags(managedClusterProperties.getName(), tagsToCreate).block();
}
}

Expand All @@ -182,13 +179,13 @@ public void deleteTags(List<Topic> ns4kafkaTopics, Map<String, Topic> brokerTopi
List<String> existingTags = brokerTopic.getSpec().getTags() != null ? brokerTopic.getSpec().getTags() : Collections.emptyList();

return existingTags.stream().filter(tag -> !newTags.contains(tag)).map(tag -> TagTopicInfo.builder()
.entityName(kafkaAsyncExecutorConfig.getConfig().getProperty(CLUSTER_ID)+":"+brokerTopic.getMetadata().getName())
.entityName(managedClusterProperties.getConfig().getProperty(CLUSTER_ID)+":"+brokerTopic.getMetadata().getName())
.typeName(tag)
.entityType(TOPIC_ENTITY_TYPE)
.build());
}).toList();

tagsToDelete.forEach(tag -> schemaRegistryClient.deleteTag(kafkaAsyncExecutorConfig.getName(), tag.entityName(), tag.typeName()).block());
tagsToDelete.forEach(tag -> schemaRegistryClient.deleteTag(managedClusterProperties.getName(), tag.entityName(), tag.typeName()).block());
}

/**
Expand Down Expand Up @@ -228,11 +225,11 @@ public List<String> listBrokerTopicNames() throws InterruptedException, Executio
* @param topics Topics to complete
*/
public void completeWithTags(Map<String, Topic> topics) {
if(kafkaAsyncExecutorConfig.getProvider().equals(KafkaAsyncExecutorConfig.KafkaProvider.CONFLUENT_CLOUD)) {
if(managedClusterProperties.getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)) {
topics.entrySet().stream()
.forEach(entry ->
entry.getValue().getSpec().setTags(schemaRegistryClient.getTopicWithTags(kafkaAsyncExecutorConfig.getName(),
kafkaAsyncExecutorConfig.getConfig().getProperty(CLUSTER_ID) + ":" + entry.getValue().getMetadata().getName())
entry.getValue().getSpec().setTags(schemaRegistryClient.getTopicWithTags(managedClusterProperties.getName(),
managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + entry.getValue().getMetadata().getName())
.block().stream().map(TagTopicInfo::typeName).toList()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.michelin.ns4kafka.models.Topic;
import com.michelin.ns4kafka.properties.ManagedClusterProperties;
import com.michelin.ns4kafka.repositories.TopicRepository;
import com.michelin.ns4kafka.config.KafkaAsyncExecutorConfig;
import com.michelin.ns4kafka.services.clients.schema.SchemaRegistryClient;
import com.michelin.ns4kafka.services.clients.schema.entities.TagInfo;
import com.michelin.ns4kafka.services.executors.TopicAsyncExecutor;
Expand Down Expand Up @@ -902,7 +901,7 @@ void shouldTagsBeValid() {

List<TagInfo> tagInfo = List.of(TagInfo.builder().name("TAG_TEST").build());

when(kafkaAsyncExecutorConfigs.stream()).thenReturn(Stream.of(new KafkaAsyncExecutorConfig("local", KafkaAsyncExecutorConfig.KafkaProvider.CONFLUENT_CLOUD)));
when(managedClusterProperties.stream()).thenReturn(Stream.of(new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)));
when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(tagInfo));

List<String> validationErrors = topicService.validateTags(ns, topic);
Expand All @@ -924,7 +923,7 @@ void shouldTagsBeInvalidWhenNotConfluentCloud() {
.tags(List.of("TAG_TEST")).build())
.build();

when(kafkaAsyncExecutorConfigs.stream()).thenReturn(Stream.of(new KafkaAsyncExecutorConfig("local", KafkaAsyncExecutorConfig.KafkaProvider.SELF_MANAGED)));
when(managedClusterProperties.stream()).thenReturn(Stream.of(new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.SELF_MANAGED)));

List<String> validationErrors = topicService.validateTags(ns, topic);
assertEquals(1, validationErrors.size());
Expand All @@ -946,7 +945,7 @@ void shouldTagsBeInvalidWhenNoTagsAllowed() {
.tags(List.of("TAG_TEST")).build())
.build();

when(kafkaAsyncExecutorConfigs.stream()).thenReturn(Stream.of(new KafkaAsyncExecutorConfig("local", KafkaAsyncExecutorConfig.KafkaProvider.CONFLUENT_CLOUD)));
when(managedClusterProperties.stream()).thenReturn(Stream.of(new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)));
when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(Collections.emptyList()));

List<String> validationErrors = topicService.validateTags(ns, topic);
Expand All @@ -971,7 +970,7 @@ void shouldTagsBeInvalidWhenNotAllowed() {

List<TagInfo> tagInfo = List.of(TagInfo.builder().name("TAG_TEST").build());

when(kafkaAsyncExecutorConfigs.stream()).thenReturn(Stream.of(new KafkaAsyncExecutorConfig("local", KafkaAsyncExecutorConfig.KafkaProvider.CONFLUENT_CLOUD)));
when(managedClusterProperties.stream()).thenReturn(Stream.of(new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)));
when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(tagInfo));

List<String> validationErrors = topicService.validateTags(ns, topic);
Expand Down

0 comments on commit f2a2a30

Please sign in to comment.