Skip to content

Commit

Permalink
Manage Confluent tags
Browse files Browse the repository at this point in the history
  • Loading branch information
adriencalime committed Oct 3, 2023
1 parent 83d9001 commit 55b8c8e
Showing 1 changed file with 33 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void synchronizeTopics() {
createTopics(toCreate);
alterTopics(toUpdate, toCheckConf);

if (managedClusterProperties.getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)) {
if (isConfluent()) {
createTags(ns4kafkaTopics, brokerTopics);
deleteTags(ns4kafkaTopics, brokerTopics);
}
Expand All @@ -154,25 +154,23 @@ public void synchronizeTopics() {
* @param brokerTopics Topics from broker
*/
public void createTags(List<Topic> ns4kafkaTopics, Map<String, Topic> brokerTopics) {
List<TagSpecs> tagsToCreate = ns4kafkaTopics.stream().flatMap(ns4kafkaTopic -> {
Topic brokerTopic = brokerTopics.get(ns4kafkaTopic.getMetadata().getName());

List<String> existingTags = brokerTopic != null && brokerTopic.getSpec().getTags() != null
? brokerTopic.getSpec().getTags()
: Collections.emptyList();
List<String> newTags = ns4kafkaTopic.getSpec().getTags() != null
? ns4kafkaTopic.getSpec().getTags()
: Collections.emptyList();

return newTags.stream().filter(tag -> !existingTags.contains(tag)).map(tag -> TagSpecs.builder()
.entityName(
managedClusterProperties.getConfig().getProperty(CLUSTER_ID)
+ ":"
+ ns4kafkaTopic.getMetadata().getName())
.typeName(tag)
.entityType(TOPIC_ENTITY_TYPE)
.build());
}).toList();
List<TagSpecs> tagsToCreate = ns4kafkaTopics
.stream()
.filter(topic -> topic.getMetadata().getGeneration() == 1)
.flatMap(ns4kafkaTopic -> {
Topic brokerTopic = brokerTopics.get(ns4kafkaTopic.getMetadata().getName());
Set<String> existingTags = brokerTopic != null ? new HashSet<>(brokerTopic.getSpec().getTags()) : Collections.emptySet();
Set<String> newTags = new HashSet<>(ns4kafkaTopic.getSpec().getTags());
newTags.removeAll(existingTags);
return newTags
.stream()
.map(tag -> TagSpecs.builder()
.entityName(managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + ns4kafkaTopic.getMetadata().getName())
.typeName(tag)
.entityType(TOPIC_ENTITY_TYPE)
.build());
})
.toList();

if (!tagsToCreate.isEmpty()) {
String stringTags = String.join(",", tagsToCreate
Expand Down Expand Up @@ -225,6 +223,16 @@ public void deleteTags(List<Topic> ns4kafkaTopics, Map<String, Topic> brokerTopi
managedClusterProperties.getName(),
tag.entityName(),
tag.typeName()).block());

if (!tagsToDelete.isEmpty()) {
tagsToDelete
.forEach(tag -> schemaRegistryClient.deleteTag(
managedClusterProperties.getName(),
tag.entityName(),
tag.typeName())
.subscribe(success -> log.debug(String.format("Success deleting tag %s.", tag)),
error -> log.error(String.format("Error deleting tag %s.", tag))));
}
}

/**
Expand Down Expand Up @@ -266,7 +274,7 @@ public List<String> listBrokerTopicNames() throws InterruptedException, Executio
* @param topics Topics to complete
*/
public void enrichWithTags(Map<String, Topic> topics) {
if (managedClusterProperties.getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)) {
if (isConfluent()) {
topics.forEach((key, value) ->
value.getSpec().setTags(schemaRegistryClient.getTopicWithTags(managedClusterProperties.getName(),
managedClusterProperties
Expand All @@ -277,6 +285,10 @@ public void enrichWithTags(Map<String, Topic> topics) {
}
}

public boolean isConfluent() {
return managedClusterProperties.getProvider().equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD);
}

/**
* Collect all topics on broker from a list of topic names.
*
Expand Down

0 comments on commit 55b8c8e

Please sign in to comment.