Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create tags when missing on Schema Registry #345

Merged
merged 15 commits into from
Dec 15, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public HttpResponse<Topic> apply(String namespace, @Valid @Body Topic topic,
validationErrors.addAll(topicService.validateTopicUpdate(ns, existingTopic.get(), topic));
}

topic.getSpec().getTags().replaceAll(String::toUpperCase);
List<String> existingTags = existingTopic
.map(oldTopic -> oldTopic.getSpec().getTags())
.orElse(Collections.emptyList());
Expand Down
25 changes: 0 additions & 25 deletions src/main/java/com/michelin/ns4kafka/services/TopicService.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ public class TopicService {
@Inject
List<ManagedClusterProperties> managedClusterProperties;

@Inject
SchemaRegistryClient schemaRegistryClient;

/**
* Find all topics.
*
Expand Down Expand Up @@ -348,28 +345,6 @@ public List<String> validateTags(Namespace namespace, Topic topic) {
return validationErrors;
}

Set<String> tagNames = schemaRegistryClient.getTags(namespace.getMetadata().getCluster())
.map(tags -> tags.stream().map(TagInfo::name).collect(Collectors.toSet())).block();

if (tagNames == null || tagNames.isEmpty()) {
validationErrors.add(String.format(
"Invalid value %s for tags: No tags allowed.",
String.join(", ", topic.getSpec().getTags())));
return validationErrors;
}

List<String> unavailableTagNames = topic.getSpec().getTags()
.stream()
.filter(tagName -> !tagNames.contains(tagName))
.toList();

if (!unavailableTagNames.isEmpty()) {
validationErrors.add(String.format(
"Invalid value %s for tags: Available tags are %s.",
String.join(", ", unavailableTagNames),
String.join(", ", tagNames)));
}

return validationErrors;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ public Mono<SchemaCompatibilityResponse> deleteCurrentCompatibilityBySubject(Str
public Mono<List<TagInfo>> getTags(String kafkaCluster) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest
.GET(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/types/tagdefs")))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
.GET(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/types/tagdefs")))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class)));
}

Expand All @@ -215,7 +215,7 @@ public Mono<TagEntities> getTopicWithTags(String kafkaCluster) {
* @param tagSpecs Tags to add
* @return Information about added tags
*/
public Mono<List<TagTopicInfo>> addTags(String kafkaCluster, List<TagTopicInfo> tagSpecs) {
public Mono<List<TagTopicInfo>> associateTags(String kafkaCluster, List<TagTopicInfo> tagSpecs) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest
.POST(URI.create(StringUtils.prependUri(
Expand All @@ -225,6 +225,21 @@ public Mono<List<TagTopicInfo>> addTags(String kafkaCluster, List<TagTopicInfo>
return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class)));
}

/**
* Create tags.
*
* @param tags The list of tags to create
* @param kafkaCluster The Kafka cluster
* @return Information about created tags
*/
public Mono<List<TagInfo>> createTags(List<TagInfo> tags, String kafkaCluster) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.POST(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/types/tagdefs")), tags)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class)));
}

/**
* Delete a tag to a topic.
*
Expand All @@ -233,7 +248,7 @@ public Mono<List<TagTopicInfo>> addTags(String kafkaCluster, List<TagTopicInfo>
* @param tagName The tag to delete
* @return The resume response
*/
public Mono<HttpResponse<Void>> deleteTag(String kafkaCluster, String entityName, String tagName) {
public Mono<HttpResponse<Void>> dissociateTag(String kafkaCluster, String entityName, String tagName) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest
.DELETE(URI.create(StringUtils.prependUri(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.michelin.ns4kafka.services.clients.schema.SchemaRegistryClient;
import com.michelin.ns4kafka.services.clients.schema.entities.TagEntities;
import com.michelin.ns4kafka.services.clients.schema.entities.TagEntity;
import com.michelin.ns4kafka.services.clients.schema.entities.TagInfo;
import com.michelin.ns4kafka.services.clients.schema.entities.TagTopicInfo;
import io.micronaut.context.annotation.EachBean;
import jakarta.inject.Singleton;
Expand Down Expand Up @@ -154,7 +155,7 @@ public void alterTags(List<Topic> ns4kafkaTopics, Map<String, Topic> brokerTopic
// Get tags to delete
Set<String> existingTags = new HashSet<>(brokerTopic.getSpec().getTags());
existingTags.removeAll(Set.copyOf(topic.getSpec().getTags()));
deleteTags(existingTags, topic.getMetadata().getName());
dissociateTags(existingTags, topic.getMetadata().getName());

// Get tags to create
Set<String> newTags = new HashSet<>(topic.getSpec().getTags());
Expand All @@ -173,7 +174,7 @@ public void alterTags(List<Topic> ns4kafkaTopics, Map<String, Topic> brokerTopic
.toList();

if (!tagsToCreate.isEmpty()) {
createTags(tagsToCreate);
createAndAssociateTags(tagsToCreate);
}
}

Expand All @@ -191,7 +192,7 @@ public void deleteTopic(Topic topic) throws InterruptedException, ExecutionExcep
managedClusterProperties.getName());

if (isConfluentCloud() && !topic.getSpec().getTags().isEmpty()) {
deleteTags(topic.getSpec().getTags(), topic.getMetadata().getName());
dissociateTags(topic.getSpec().getTags(), topic.getMetadata().getName());
}
}

Expand Down Expand Up @@ -378,37 +379,54 @@ private void createTopics(List<Topic> topics) {
}

/**
* Create tags.
* Create tags and associate them.
*
* @param tagsToCreate The tags to create
* @param tagsToAssociate The tags to create and associate
*/
private void createTags(List<TagTopicInfo> tagsToCreate) {
String stringTags = String.join(", ", tagsToCreate
.stream()
.map(Record::toString)
.toList());
private void createAndAssociateTags(List<TagTopicInfo> tagsToAssociate) {
List<TagInfo> tagsToCreate = tagsToAssociate
.stream()
.map(tag -> TagInfo
.builder()
.name(tag.typeName())
.build())
.toList();

schemaRegistryClient.addTags(managedClusterProperties.getName(), tagsToCreate)
.subscribe(success -> log.info(String.format("Success creating tag %s.", stringTags)),
error -> log.error(String.format("Error creating tag %s.", stringTags), error));
String stringTags = String.join(", ", tagsToAssociate
.stream()
.map(Record::toString)
.toList());

schemaRegistryClient.createTags(tagsToCreate, managedClusterProperties.getName())
.subscribe(
successCreation ->
schemaRegistryClient.associateTags(
managedClusterProperties.getName(),
tagsToAssociate)
.subscribe(
successAssociation ->
log.info(String.format("Success associating tag %s.", stringTags)),
error ->
log.error(String.format("Error associating tag %s.", stringTags), error)),
error -> log.error(String.format("Error creating tag %s.", stringTags), error));
}

/**
* Delete tags.
* Dissociate tags to a topic.
*
* @param tagsToDelete The tags to delete
* @param topicName The topic name
* @param tagsToDissociate The tags to dissociate
* @param topicName The topic name
*/
private void deleteTags(Collection<String> tagsToDelete, String topicName) {
tagsToDelete
.forEach(tag -> schemaRegistryClient.deleteTag(managedClusterProperties.getName(),
private void dissociateTags(Collection<String> tagsToDissociate, String topicName) {
tagsToDissociate
.forEach(tag -> schemaRegistryClient.dissociateTag(managedClusterProperties.getName(),
managedClusterProperties.getConfig().getProperty(CLUSTER_ID)
+ ":" + topicName, tag)
.subscribe(success -> log.info(String.format("Success deleting tag %s.",
.subscribe(success -> log.info(String.format("Success dissociating tag %s.",
managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":"
+ topicName
+ "/" + tag)),
error -> log.error(String.format("Error deleting tag %s.",
error -> log.error(String.format("Error dissociating tag %s.",
managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":"
+ topicName
+ "/" + tag), error)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ void shouldNotValidateTagsWhenNoNewTag() throws InterruptedException, ExecutionE
.name("test.topic")
.build())
.spec(Topic.TopicSpec.builder()
.tags(List.of("TAG1"))
.tags(Arrays.asList("TAG1"))
adriencalime marked this conversation as resolved.
Show resolved Hide resolved
.replicationFactor(3)
.partitions(3)
.configs(Map.of("cleanup.policy", "delete",
Expand Down
91 changes: 15 additions & 76 deletions src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.michelin.ns4kafka.properties.ManagedClusterProperties;
import com.michelin.ns4kafka.repositories.TopicRepository;
import com.michelin.ns4kafka.services.clients.schema.SchemaRegistryClient;
import com.michelin.ns4kafka.services.clients.schema.entities.TagInfo;
import com.michelin.ns4kafka.services.executors.TopicAsyncExecutor;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
Expand All @@ -23,6 +22,7 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
Expand All @@ -33,7 +33,6 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.Mono;

@ExtendWith(MockitoExtension.class)
class TopicServiceTest {
Expand Down Expand Up @@ -886,26 +885,20 @@ void findAll() {

@Test
void shouldTagsBeValid() {
Namespace ns = Namespace.builder()
adriencalime marked this conversation as resolved.
Show resolved Hide resolved
.metadata(ObjectMeta.builder()
.name("namespace")
.cluster("local")
.build())
.build();

Topic topic = Topic.builder()
.metadata(ObjectMeta.builder().name("ns-topic1").build())
.spec(Topic.TopicSpec.builder()
.tags(List.of("TAG_TEST")).build())
.build();

List<TagInfo> tagInfo = List.of(TagInfo.builder().name("TAG_TEST").build());

when(managedClusterProperties.stream()).thenReturn(Stream.of(
new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)));
when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(tagInfo));

List<String> validationErrors = topicService.validateTags(ns, topic);
ManagedClusterProperties managedClusterProps =
new ManagedClusterProperties("local",
ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD);
Properties properties = new Properties();
managedClusterProps.setConfig(properties);

when(managedClusterProperties.stream()).thenReturn(Stream.of(managedClusterProps));

List<String> validationErrors = topicService.validateTags(
Namespace.builder().metadata(
ObjectMeta.builder().name("namespace").cluster("local").build()).build(),
Topic.builder().metadata(
ObjectMeta.builder().name("ns-topic1").build()).spec(Topic.TopicSpec.builder()
.tags(List.of("TAG_TEST")).build()).build());
assertEquals(0, validationErrors.size());
}

Expand All @@ -931,58 +924,4 @@ void shouldTagsBeInvalidWhenNotConfluentCloud() {
assertEquals(1, validationErrors.size());
assertEquals("Invalid value TAG_TEST for tags: Tags are not currently supported.", validationErrors.get(0));
}

@Test
void shouldTagsBeInvalidWhenNoTagsAllowed() {
Namespace ns = Namespace.builder()
.metadata(ObjectMeta.builder()
.name("namespace")
.cluster("local")
.build())
.build();

Topic topic = Topic.builder()
.metadata(ObjectMeta.builder().name("ns-topic1").build())
.spec(Topic.TopicSpec.builder()
.tags(List.of("TAG_TEST")).build())
.build();

when(managedClusterProperties.stream()).thenReturn(Stream.of(
new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)));
when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(Collections.emptyList()));

List<String> validationErrors = topicService.validateTags(ns, topic);
assertEquals(1, validationErrors.size());
assertEquals(
"Invalid value TAG_TEST for tags: No tags allowed.",
validationErrors.get(0));
}

@Test
void shouldTagsBeInvalidWhenNotAllowed() {
Namespace ns = Namespace.builder()
.metadata(ObjectMeta.builder()
.name("namespace")
.cluster("local")
.build())
.build();

Topic topic = Topic.builder()
.metadata(ObjectMeta.builder().name("ns-topic1").build())
.spec(Topic.TopicSpec.builder()
.tags(List.of("BAD_TAG", "TAG_TEST")).build())
.build();

List<TagInfo> tagInfo = List.of(TagInfo.builder().name("TAG_TEST").build());

when(managedClusterProperties.stream())
.thenReturn(Stream.of(
new ManagedClusterProperties("local",
ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD)));
when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(tagInfo));

List<String> validationErrors = topicService.validateTags(ns, topic);
assertEquals(1, validationErrors.size());
assertEquals("Invalid value BAD_TAG for tags: Available tags are TAG_TEST.", validationErrors.get(0));
}
}
Loading