Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manage Confluent Cloud tags #321

Merged
merged 28 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,24 @@ Ns4Kafka introduces namespace functionality to Apache Kafka, as well as a new de

## Table of Contents

* [Principles](#principles)
* [Namespace Isolation](#namespace-isolation)
* [Desired State](#desired-state)
* [Server Side Validation](#server-side-validation)
* [CLI](#cli)
* [Download](#download)
* [Install](#install)
* [Demo Environment](#demo-environment)
* [Configuration](#configuration)
* [GitLab Authentication](#gitlab-authentication)
* [Admin Account](#admin-account)
* [Kafka Broker Authentication](#kafka-broker-authentication)
* [Managed clusters](#managed-clusters)
* [AKHQ](#akhq)
* [Administration](#administration)
* [Contribution](#contribution)
- [Ns4Kafka](#ns4kafka)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not need to add Table of Contents in the table of contents

- [Table of Contents](#table-of-contents)
- [Principles](#principles)
- [Namespace Isolation](#namespace-isolation)
- [Desired State](#desired-state)
- [Server Side Validation](#server-side-validation)
- [CLI](#cli)
- [Download](#download)
- [Install](#install)
- [Demo Environment](#demo-environment)
- [Configuration](#configuration)
- [GitLab Authentication](#gitlab-authentication)
- [Admin Account](#admin-account)
- [Kafka Broker Authentication](#kafka-broker-authentication)
- [Managed clusters](#managed-clusters)
- [AKHQ](#akhq)
- [Administration](#administration)
- [Contribution](#contribution)

## Principles

Expand Down Expand Up @@ -168,6 +170,7 @@ ns4kafka:
sasl.mechanism: "PLAIN"
security.protocol: "SASL_PLAINTEXT"
sasl.jaas.config: "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin\";"
cluster.id: "lkc-abcde"
schema-registry:
url: "http://localhost:8081"
basicAuthUsername: "user"
Expand All @@ -190,6 +193,7 @@ The name for each managed cluster has to be unique. This is this name you have t
| drop-unsync-acls | boolean | Should Ns4Kafka drop unsynchronized ACLs |
| provider | boolean | The kind of cluster. Either SELF_MANAGED or CONFLUENT_CLOUD |
| config.bootstrap.servers | string | The location of the clusters servers |
| config.cluster.id | string | The confluent cloud cluster id to manage tags |
| schema-registry.url | string | The location of the Schema Registry |
| schema-registry.basicAuthUsername | string | Basic authentication username to the Schema Registry |
| schema-registry.basicAuthPassword | string | Basic authentication password to the Schema Registry |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public HttpResponse<Topic> apply(String namespace, @Valid @Body Topic topic, @Qu
validationErrors.addAll(topicService.validateTopicUpdate(ns, existingTopic.get(), topic));
}

validateTags(topic, existingTopic, validationErrors, ns);
adriencalime marked this conversation as resolved.
Show resolved Hide resolved

if (!validationErrors.isEmpty()) {
throw new ResourceValidationException(validationErrors, topic.getKind(), topic.getMetadata().getName());
}
Expand Down Expand Up @@ -121,6 +123,23 @@ public HttpResponse<Topic> apply(String namespace, @Valid @Body Topic topic, @Qu
return formatHttpResponse(topicService.create(topic), status);
}

/**
* Validate on new tags only, not on deletion
* @param topic The topic to apply
* @param existingTopic The existing topic
* @param validationErrors A list of validation errors
* @param ns The namespace
*/
public void validateTags(Topic topic, Optional<Topic> existingTopic, List<String> validationErrors, Namespace ns) {
if(topic.getMetadata().getTags() == null) {
adriencalime marked this conversation as resolved.
Show resolved Hide resolved
topic.getMetadata().setTags(Collections.emptyList());
}
List<String> existingTags = existingTopic.isPresent() && existingTopic.get().getMetadata().getTags() != null ? existingTopic.get().getMetadata().getTags() : Collections.emptyList();
if(topic.getMetadata().getTags().stream().anyMatch(newTag -> !existingTags.contains(newTag))) {
validationErrors.addAll(topicService.validateTags(ns, topic));
}
}

/**
* Delete a topic
* @param namespace The namespace
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/michelin/ns4kafka/models/ObjectMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import javax.validation.constraints.Pattern;
import java.util.Date;
import java.util.Map;
import java.util.List;

@Data
@Builder
Expand All @@ -26,5 +27,5 @@ public class ObjectMeta {
@EqualsAndHashCode.Exclude
@JsonFormat(shape = JsonFormat.Shape.STRING)
private Date creationTimestamp;

private List<String> tags;
adriencalime marked this conversation as resolved.
Show resolved Hide resolved
}
42 changes: 41 additions & 1 deletion src/main/java/com/michelin/ns4kafka/services/TopicService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.michelin.ns4kafka.models.Namespace;
import com.michelin.ns4kafka.models.Topic;
import com.michelin.ns4kafka.repositories.TopicRepository;
import com.michelin.ns4kafka.services.clients.schema.SchemaRegistryClient;
import com.michelin.ns4kafka.services.clients.schema.entities.TagInfo;
import com.michelin.ns4kafka.services.executors.TopicAsyncExecutor;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
Expand Down Expand Up @@ -34,6 +36,9 @@ public class TopicService {
@Inject
List<KafkaAsyncExecutorConfig> kafkaAsyncExecutorConfig;

@Inject
SchemaRegistryClient schemaRegistryClient;

/**
* Find all topics
* @return The list of topics
Expand Down Expand Up @@ -170,7 +175,6 @@ public List<String> validateTopicUpdate(Namespace namespace, Topic existingTopic
validationErrors.add(String.format("Invalid value %s for configuration cleanup.policy: Altering topic configuration from `delete` to `compact` is not currently supported. Please create a new topic with `compact` policy specified instead.",
newTopic.getSpec().getConfigs().get(CLEANUP_POLICY_CONFIG)));
}

return validationErrors;
}

Expand Down Expand Up @@ -284,4 +288,40 @@ public Map<TopicPartition, Long> deleteRecords(Topic topic, Map<TopicPartition,
throw new InterruptedException(e.getMessage());
}
}

/**
* Validate tags for topic
* @param namespace The namespace
* @param topic The topic which contains tags
* @return A list of validation errors
*/
public List<String> validateTags(Namespace namespace, Topic topic) {
List<String> validationErrors = new ArrayList<>();

Optional<KafkaAsyncExecutorConfig> topicCluster = kafkaAsyncExecutorConfig
.stream()
.filter(cluster -> namespace.getMetadata().getCluster().equals(cluster.getName()))
.findFirst();

if(topicCluster.isPresent() && !topicCluster.get().getProvider().equals(KafkaAsyncExecutorConfig.KafkaProvider.CONFLUENT_CLOUD)) {
validationErrors.add("Tags can only be used on confluent clusters.");
adriencalime marked this conversation as resolved.
Show resolved Hide resolved
return validationErrors;
}

Set<String> tagNames = schemaRegistryClient.getTags(namespace.getMetadata().getCluster())
.map(tags -> tags.stream().map(TagInfo::name).collect(Collectors.toSet())).block();

if(tagNames == null || tagNames.isEmpty()) {
validationErrors.add(String.format("Invalid value (%s) for tags: No tags defined on the kafka cluster.",
adriencalime marked this conversation as resolved.
Show resolved Hide resolved
String.join(" ", topic.getMetadata().getTags())));
return validationErrors;
}

if(!tagNames.containsAll(topic.getMetadata().getTags())) {
validationErrors.add(String.format("Invalid value (%s) for tags: Available tags are (%s).",
String.join(" ", topic.getMetadata().getTags()), String.join(" ", tagNames)));
adriencalime marked this conversation as resolved.
Show resolved Hide resolved
}

return validationErrors;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import com.michelin.ns4kafka.config.KafkaAsyncExecutorConfig;
import com.michelin.ns4kafka.services.clients.schema.entities.*;
import com.michelin.ns4kafka.utils.exceptions.ResourceValidationException;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.HttpClient;
Expand Down Expand Up @@ -146,6 +148,58 @@ public Mono<SchemaCompatibilityResponse> deleteCurrentCompatibilityBySubject(Str
return Mono.from(httpClient.retrieve(request, SchemaCompatibilityResponse.class));
}

/**
* List tags
* @param kafkaCluster The Kafka cluster
* @return A list of tags
*/
public Mono<List<TagInfo>> getTags(String kafkaCluster) {
KafkaAsyncExecutorConfig.RegistryConfig config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/types/tagdefs")))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class)));
}

/**
* List tags of a topic
* @param kafkaCluster The Kafka cluster
* @param entityName The topic's name for the API
* @return A list of tags
*/
public Mono<List<TagTopicInfo>> getTopicWithTags(String kafkaCluster, String entityName) {
KafkaAsyncExecutorConfig.RegistryConfig config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/entity/type/kafka_topic/name/" + entityName + "/tags")))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class)));
}

/**
* Add a tag to a topic
* @param kafkaCluster The Kafka cluster
* @param tagSpecs Tags to add
* @return Information about added tags
*/
public Mono<List<TagTopicInfo>> addTags(String kafkaCluster, List<TagSpecs> tagSpecs) {
KafkaAsyncExecutorConfig.RegistryConfig config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.POST(URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/entity/tags")), tagSpecs)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class)));
}

/**
* Delete a tag to a topic
* @param kafkaCluster The Kafka cluster
* @param entityName The topic's name
* @param tagName The tag to delete
* @return The resume response
*/
public Mono<HttpResponse<Void>> deleteTag(String kafkaCluster, String entityName, String tagName) {
KafkaAsyncExecutorConfig.RegistryConfig config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.DELETE(URI.create(StringUtils.prependUri(config.getUrl(), "/catalog/v1/entity/type/kafka_topic/name/" + entityName + "/tags/" + tagName)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.exchange(request, Void.class));
}

/**
* Get the schema registry of the given Kafka cluster
* @param kafkaCluster The Kafka cluster
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.michelin.ns4kafka.services.clients.schema.entities;

import lombok.Builder;

@Builder
public record TagInfo(String name) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.michelin.ns4kafka.services.clients.schema.entities;

import lombok.Builder;

@Builder
public record TagSpecs(String entityName, String entityType, String typeName) {
}
adriencalime marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.michelin.ns4kafka.services.clients.schema.entities;

import lombok.Builder;

@Builder
public record TagTopicInfo(String entityName, String entityType, String typeName, String entityStatus) {
}
adriencalime marked this conversation as resolved.
Show resolved Hide resolved
Loading