Skip to content

Commit

Permalink
Improve tags managment
Browse files Browse the repository at this point in the history
  • Loading branch information
adriencalime committed Dec 4, 2023
1 parent b599d02 commit b71928b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 8 deletions.
31 changes: 26 additions & 5 deletions src/main/java/com/michelin/ns4kafka/services/TopicService.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class TopicService {
@Inject
SchemaRegistryClient schemaRegistryClient;

public static final String DYNAMIC_TAGS_CREATION = "dynamic.tags.creation";

/**
* Find all topics.
*
Expand Down Expand Up @@ -351,18 +353,37 @@ public List<String> validateTags(Namespace namespace, Topic topic) {
Set<String> tagNames = schemaRegistryClient.getTags(namespace.getMetadata().getCluster())
.map(tags -> tags.stream().map(TagInfo::name).collect(Collectors.toSet())).block();

List<String> unavailableTagNames = topic.getSpec().getTags()
.stream()
.filter(tagName -> tagNames != null && !tagNames.contains(tagName))
.toList();

if(topicCluster.isPresent() &&
Boolean.parseBoolean(
topicCluster.get().getConfig().getProperty(
DYNAMIC_TAGS_CREATION,
"true"))) {

List<TagInfo> tagsToCreate = unavailableTagNames
.stream()
.map(TagInfo::new)
.collect(Collectors.toList());

schemaRegistryClient.createTags(
tagsToCreate,
namespace.getMetadata().getCluster())
.block();

return validationErrors;
}

if (tagNames == null || tagNames.isEmpty()) {
validationErrors.add(String.format(
"Invalid value %s for tags: No tags allowed.",
String.join(", ", topic.getSpec().getTags())));
return validationErrors;
}

List<String> unavailableTagNames = topic.getSpec().getTags()
.stream()
.filter(tagName -> !tagNames.contains(tagName))
.toList();

if (!unavailableTagNames.isEmpty()) {
validationErrors.add(String.format(
"Invalid value %s for tags: Available tags are %s.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ public Mono<SchemaCompatibilityResponse> deleteCurrentCompatibilityBySubject(Str
public Mono<List<TagInfo>> getTags(String kafkaCluster) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest
.GET(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/types/tagdefs")))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
.GET(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/types/tagdefs")))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class)));
}

Expand Down Expand Up @@ -225,6 +225,21 @@ public Mono<List<TagTopicInfo>> addTags(String kafkaCluster, List<TagTopicInfo>
return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class)));
}

/**
* Create tags.
*
* @param tags The list of tags to create
* @param kafkaCluster The Kafka cluster
* @return Information about created tags
*/
public Mono<List<TagInfo>> createTags(List<TagInfo> tags, String kafkaCluster) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.POST(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/types/tagdefs")), tags)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class)));
}

/**
* Delete a tag to a topic.
*
Expand Down

0 comments on commit b71928b

Please sign in to comment.