diff --git a/src/main/java/com/michelin/ns4kafka/services/TopicService.java b/src/main/java/com/michelin/ns4kafka/services/TopicService.java index fb8479a5..1dcb9343 100644 --- a/src/main/java/com/michelin/ns4kafka/services/TopicService.java +++ b/src/main/java/com/michelin/ns4kafka/services/TopicService.java @@ -48,6 +48,8 @@ public class TopicService { @Inject SchemaRegistryClient schemaRegistryClient; + public static final String DYNAMIC_TAGS_CREATION = "dynamic.tags.creation"; + /** * Find all topics. * @@ -351,6 +353,30 @@ public List validateTags(Namespace namespace, Topic topic) { Set tagNames = schemaRegistryClient.getTags(namespace.getMetadata().getCluster()) .map(tags -> tags.stream().map(TagInfo::name).collect(Collectors.toSet())).block(); + List unavailableTagNames = topic.getSpec().getTags() + .stream() + .filter(tagName -> tagNames != null && !tagNames.contains(tagName)) + .toList(); + + if(topicCluster.isPresent() && + Boolean.parseBoolean( + topicCluster.get().getConfig().getProperty( + DYNAMIC_TAGS_CREATION, + "true"))) { + + List tagsToCreate = unavailableTagNames + .stream() + .map(TagInfo::new) + .collect(Collectors.toList()); + + schemaRegistryClient.createTags( + tagsToCreate, + namespace.getMetadata().getCluster()) + .block(); + + return validationErrors; + } + if (tagNames == null || tagNames.isEmpty()) { validationErrors.add(String.format( "Invalid value %s for tags: No tags allowed.", @@ -358,11 +384,6 @@ public List validateTags(Namespace namespace, Topic topic) { return validationErrors; } - List unavailableTagNames = topic.getSpec().getTags() - .stream() - .filter(tagName -> !tagNames.contains(tagName)) - .toList(); - if (!unavailableTagNames.isEmpty()) { validationErrors.add(String.format( "Invalid value %s for tags: Available tags are %s.", diff --git a/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java b/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java index dcb2c7a5..87d6132e 100644 --- a/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java +++ b/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java @@ -186,9 +186,9 @@ public Mono deleteCurrentCompatibilityBySubject(Str public Mono> getTags(String kafkaCluster) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); HttpRequest request = HttpRequest - .GET(URI.create(StringUtils.prependUri( - config.getUrl(), "/catalog/v1/types/tagdefs"))) - .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + .GET(URI.create(StringUtils.prependUri( + config.getUrl(), "/catalog/v1/types/tagdefs"))) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class))); } @@ -225,6 +225,21 @@ public Mono> addTags(String kafkaCluster, List return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class))); } + /** + * Create tags. + * + * @param tags The list of tags to create + * @param kafkaCluster The Kafka cluster + * @return Information about created tags + */ + public Mono> createTags(List tags, String kafkaCluster) { + ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); + HttpRequest request = HttpRequest.POST(URI.create(StringUtils.prependUri( + config.getUrl(), "/catalog/v1/types/tagdefs")), tags) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class))); + } + /** * Delete a tag to a topic. *