diff --git a/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java b/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java index 4951a093..2b43f9d3 100644 --- a/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java @@ -2,11 +2,14 @@ import static com.michelin.ns4kafka.services.executors.TopicAsyncExecutor.CLUSTER_ID; import static com.michelin.ns4kafka.services.executors.TopicAsyncExecutor.TOPIC_ENTITY_TYPE; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -14,9 +17,15 @@ import com.michelin.ns4kafka.models.Topic; import com.michelin.ns4kafka.properties.ManagedClusterProperties; import com.michelin.ns4kafka.services.clients.schema.SchemaRegistryClient; +import com.michelin.ns4kafka.services.clients.schema.entities.TagTopicInfo; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DeleteTopicsResult; +import org.apache.kafka.common.KafkaFuture; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; @@ -39,6 +48,15 @@ class TopicAsyncExecutorTest { @Mock ManagedClusterProperties managedClusterProperties; + @Mock + Admin adminClient; + + @Mock + DeleteTopicsResult deleteTopicsResult; + + @Mock + KafkaFuture kafkaFuture; + @InjectMocks TopicAsyncExecutor topicAsyncExecutor; @@ -123,4 +141,122 @@ void shouldBeConfluentCloud() { when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); assertTrue(topicAsyncExecutor.isConfluentCloud()); } + + @Test + void shouldDeleteTopicNoTags() throws ExecutionException, InterruptedException, TimeoutException { + when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); + when(deleteTopicsResult.all()).thenReturn(kafkaFuture); + when(adminClient.deleteTopics(anyList())).thenReturn(deleteTopicsResult); + when(managedClusterProperties.getAdminClient()).thenReturn(adminClient); + + Topic topic = Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build(); + + topicAsyncExecutor.deleteTopic(topic); + + verify(schemaRegistryClient, never()).deleteTag(any(), any(), any()); + } + + @Test + void shouldDeleteTopicSelfManagedCluster() throws ExecutionException, InterruptedException, TimeoutException { + when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.SELF_MANAGED); + when(deleteTopicsResult.all()).thenReturn(kafkaFuture); + when(adminClient.deleteTopics(anyList())).thenReturn(deleteTopicsResult); + when(managedClusterProperties.getAdminClient()).thenReturn(adminClient); + + Topic topic = Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build(); + + topicAsyncExecutor.deleteTopic(topic); + + verify(schemaRegistryClient, never()).deleteTag(any(), any(), any()); + } + + @Test + void shouldDeleteTopicAndTags() throws ExecutionException, InterruptedException, TimeoutException { + Properties properties = new Properties(); + properties.put(CLUSTER_ID, CLUSTER_ID_TEST); + + when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); + when(deleteTopicsResult.all()).thenReturn(kafkaFuture); + when(adminClient.deleteTopics(anyList())).thenReturn(deleteTopicsResult); + when(managedClusterProperties.getAdminClient()).thenReturn(adminClient); + when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); + when(managedClusterProperties.getConfig()).thenReturn(properties); + when(schemaRegistryClient.deleteTag(anyString(), + anyString(), anyString())) + .thenReturn(Mono.empty()) + .thenReturn(Mono.error(new Exception("error"))); + + Topic topic = Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of(TAG1)) + .build()) + .build(); + + topicAsyncExecutor.deleteTopic(topic); + + verify(schemaRegistryClient).deleteTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG1); + } + + @Test + void shouldNotEnrichWithTagsWhenNotConfluentCloud() { + when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.SELF_MANAGED); + + Map brokerTopics = Map.of(TOPIC_NAME, + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build()); + + topicAsyncExecutor.enrichWithTags(brokerTopics); + + assertTrue(brokerTopics.get(TOPIC_NAME).getSpec().getTags().isEmpty()); + } + + @Test + void shouldEnrichWithTagsWhenConfluentCloud() { + Properties properties = new Properties(); + properties.put(CLUSTER_ID, CLUSTER_ID_TEST); + + when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); + when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); + when(managedClusterProperties.getConfig()).thenReturn(properties); + + TagTopicInfo tagTopicInfo = TagTopicInfo.builder() + .typeName("typeName") + .build(); + + when(schemaRegistryClient.getTopicWithTags(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME)) + .thenReturn(Mono.just(List.of(tagTopicInfo))); + + Map brokerTopics = Map.of(TOPIC_NAME, + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build()); + + topicAsyncExecutor.enrichWithTags(brokerTopics); + + assertEquals("typeName", brokerTopics.get(TOPIC_NAME).getSpec().getTags().get(0)); + } }