diff --git a/src/main/java/com/michelin/ns4kafka/property/ConfluentCloudProperties.java b/src/main/java/com/michelin/ns4kafka/property/ConfluentCloudProperties.java new file mode 100644 index 00000000..5350ec2d --- /dev/null +++ b/src/main/java/com/michelin/ns4kafka/property/ConfluentCloudProperties.java @@ -0,0 +1,25 @@ +package com.michelin.ns4kafka.property; + +import io.micronaut.context.annotation.ConfigurationProperties; +import lombok.Getter; +import lombok.Setter; + +/** + * Confluent Cloud properties. + */ +@Getter +@Setter +@ConfigurationProperties("ns4kafka.confluent-cloud") +public class ConfluentCloudProperties { + private StreamCatalogProperties streamCatalog; + + /** + * Stream Catalog properties. + */ + @Getter + @Setter + @ConfigurationProperties("stream-catalog") + public static class StreamCatalogProperties { + int pageSize = 500; + } +} \ No newline at end of file diff --git a/src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java index 0ea41405..a0c9f54a 100644 --- a/src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java @@ -2,6 +2,7 @@ import com.michelin.ns4kafka.model.Metadata; import com.michelin.ns4kafka.model.Topic; +import com.michelin.ns4kafka.property.ConfluentCloudProperties; import com.michelin.ns4kafka.property.ManagedClusterProperties; import com.michelin.ns4kafka.repository.TopicRepository; import com.michelin.ns4kafka.repository.kafka.KafkaStoreException; @@ -62,6 +63,8 @@ public class TopicAsyncExecutor { private SchemaRegistryClient schemaRegistryClient; + private ConfluentCloudProperties confluentCloudProperties; + private Admin getAdminClient() { return managedClusterProperties.getAdminClient(); } @@ -111,18 +114,17 @@ public void synchronizeTopics() { .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); if (!createTopics.isEmpty()) { - log.debug("Topic(s) to create: " - + String.join(", ", createTopics.stream().map(topic -> topic.getMetadata().getName()).toList())); + log.debug("Topic(s) to create: {}", String.join(", ", + createTopics.stream().map(topic -> topic.getMetadata().getName()).toList())); } if (!updateTopics.isEmpty()) { - log.debug("Topic(s) to update: " - + String.join(", ", updateTopics.keySet().stream().map(ConfigResource::name).toList())); + log.debug("Topic(s) to update: {}", String.join(", ", + updateTopics.keySet().stream().map(ConfigResource::name).toList())); for (Map.Entry> e : updateTopics.entrySet()) { for (AlterConfigOp op : e.getValue()) { - log.debug( - e.getKey().name() + " " + op.opType().toString() + " " + op.configEntry().name() + "(" - + op.configEntry().value() + ")"); + log.debug("{} {} {}({})", e.getKey().name(), op.opType().toString(), + op.configEntry().name(), op.configEntry().value()); } } } @@ -271,7 +273,7 @@ public void enrichWithCatalogInfo(Map topics) { // getting list of topics by managing offset & limit int offset = 0; - int limit = 5000; + int limit = confluentCloudProperties.getStreamCatalog().getPageSize(); do { topicListResponse = schemaRegistryClient.getTopicWithCatalogInfo( managedClusterProperties.getName(), limit, offset).block(); @@ -332,7 +334,7 @@ public Map collectBrokerTopicsFromNames(List topicNames) .build()) .spec(Topic.TopicSpec.builder() .replicationFactor( - topicDescriptions.get(stringMapEntry.getKey()).partitions().get(0).replicas().size()) + topicDescriptions.get(stringMapEntry.getKey()).partitions().getFirst().replicas().size()) .partitions(topicDescriptions.get(stringMapEntry.getKey()).partitions().size()) .configs(stringMapEntry.getValue()) .build()) diff --git a/src/test/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutorTest.java b/src/test/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutorTest.java index c614be32..eba27fc7 100644 --- a/src/test/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutorTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutorTest.java @@ -14,6 +14,8 @@ import com.michelin.ns4kafka.model.Metadata; import com.michelin.ns4kafka.model.Topic; +import com.michelin.ns4kafka.property.ConfluentCloudProperties; +import com.michelin.ns4kafka.property.ConfluentCloudProperties.StreamCatalogProperties; import com.michelin.ns4kafka.property.ManagedClusterProperties; import com.michelin.ns4kafka.repository.TopicRepository; import com.michelin.ns4kafka.service.client.schema.SchemaRegistryClient; @@ -59,6 +61,12 @@ class TopicAsyncExecutorTest { @Mock ManagedClusterProperties managedClusterProperties; + @Mock + ConfluentCloudProperties confluentCloudProperties; + + @Mock + StreamCatalogProperties streamCatalogProperties; + @Mock TopicRepository topicRepository; @@ -428,6 +436,10 @@ void shouldNotEnrichWithCatalogInfoWhenNotConfluentCloud() { void shouldEnrichWithCatalogInfoWhenConfluentCloud() { when(managedClusterProperties.isConfluentCloud()).thenReturn(true); when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); + when(confluentCloudProperties.getStreamCatalog()).thenReturn(streamCatalogProperties); + when(streamCatalogProperties.getPageSize()).thenReturn(500); + + int limit = 500; TopicEntity entity = TopicEntity.builder() .classificationNames(List.of(TAG1)) @@ -439,9 +451,9 @@ void shouldEnrichWithCatalogInfoWhenConfluentCloud() { TopicListResponse response1 = TopicListResponse.builder().entities(List.of(entity)).build(); TopicListResponse response2 = TopicListResponse.builder().entities(List.of()).build(); - when(schemaRegistryClient.getTopicWithCatalogInfo(LOCAL_CLUSTER, 5000, 0)) + when(schemaRegistryClient.getTopicWithCatalogInfo(LOCAL_CLUSTER, limit, 0)) .thenReturn(Mono.just(response1)); - when(schemaRegistryClient.getTopicWithCatalogInfo(LOCAL_CLUSTER, 5000, 5000)) + when(schemaRegistryClient.getTopicWithCatalogInfo(LOCAL_CLUSTER, limit, limit)) .thenReturn(Mono.just(response2)); Map brokerTopics = Map.of( @@ -463,6 +475,10 @@ void shouldEnrichWithCatalogInfoWhenConfluentCloud() { void shouldEnrichWithCatalogInfoForMultipleTopics() { when(managedClusterProperties.isConfluentCloud()).thenReturn(true); when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); + when(confluentCloudProperties.getStreamCatalog()).thenReturn(streamCatalogProperties); + when(streamCatalogProperties.getPageSize()).thenReturn(500); + + int limit = 500; TopicEntity entity1 = TopicEntity.builder() .classificationNames(List.of()) @@ -500,9 +516,9 @@ void shouldEnrichWithCatalogInfoForMultipleTopics() { .entities(List.of(entity1, entity2, entity3, entity4)).build(); TopicListResponse response2 = TopicListResponse.builder().entities(List.of()).build(); - when(schemaRegistryClient.getTopicWithCatalogInfo(LOCAL_CLUSTER, 5000, 0)) + when(schemaRegistryClient.getTopicWithCatalogInfo(LOCAL_CLUSTER, limit, 0)) .thenReturn(Mono.just(response1)); - when(schemaRegistryClient.getTopicWithCatalogInfo(LOCAL_CLUSTER, 5000, 5000)) + when(schemaRegistryClient.getTopicWithCatalogInfo(LOCAL_CLUSTER, limit, limit)) .thenReturn(Mono.just(response2)); Map brokerTopics = Map.of( @@ -553,6 +569,8 @@ void shouldEnrichWithCatalogInfoForMultipleTopics() { void shouldEnrichWithCatalogInfoWhenConfluentCloudAndResponseIsNull() { when(managedClusterProperties.isConfluentCloud()).thenReturn(true); when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); + when(confluentCloudProperties.getStreamCatalog()).thenReturn(streamCatalogProperties); + when(streamCatalogProperties.getPageSize()).thenReturn(500); when(schemaRegistryClient.getTopicWithCatalogInfo(anyString(), any(Integer.class), any(Integer.class))) .thenReturn(Mono.empty());