Skip to content

Commit

Permalink
Improve tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Loïc Greffier committed Oct 8, 2023
1 parent 493a950 commit 96aab50
Showing 1 changed file with 136 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,30 @@

import static com.michelin.ns4kafka.services.executors.TopicAsyncExecutor.CLUSTER_ID;
import static com.michelin.ns4kafka.services.executors.TopicAsyncExecutor.TOPIC_ENTITY_TYPE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.michelin.ns4kafka.models.ObjectMeta;
import com.michelin.ns4kafka.models.Topic;
import com.michelin.ns4kafka.properties.ManagedClusterProperties;
import com.michelin.ns4kafka.services.clients.schema.SchemaRegistryClient;
import com.michelin.ns4kafka.services.clients.schema.entities.TagTopicInfo;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.common.KafkaFuture;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
Expand All @@ -39,6 +48,15 @@ class TopicAsyncExecutorTest {
@Mock
ManagedClusterProperties managedClusterProperties;

@Mock
Admin adminClient;

@Mock
DeleteTopicsResult deleteTopicsResult;

@Mock
KafkaFuture<Void> kafkaFuture;

@InjectMocks
TopicAsyncExecutor topicAsyncExecutor;

Expand Down Expand Up @@ -123,4 +141,122 @@ void shouldBeConfluentCloud() {
when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD);
assertTrue(topicAsyncExecutor.isConfluentCloud());
}

@Test
void shouldDeleteTopicNoTags() throws ExecutionException, InterruptedException, TimeoutException {
when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD);
when(deleteTopicsResult.all()).thenReturn(kafkaFuture);
when(adminClient.deleteTopics(anyList())).thenReturn(deleteTopicsResult);
when(managedClusterProperties.getAdminClient()).thenReturn(adminClient);

Topic topic = Topic.builder()
.metadata(ObjectMeta.builder()
.name(TOPIC_NAME)
.build())
.spec(Topic.TopicSpec.builder()
.build())
.build();

topicAsyncExecutor.deleteTopic(topic);

verify(schemaRegistryClient, never()).deleteTag(any(), any(), any());
}

@Test
void shouldDeleteTopicSelfManagedCluster() throws ExecutionException, InterruptedException, TimeoutException {
when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.SELF_MANAGED);
when(deleteTopicsResult.all()).thenReturn(kafkaFuture);
when(adminClient.deleteTopics(anyList())).thenReturn(deleteTopicsResult);
when(managedClusterProperties.getAdminClient()).thenReturn(adminClient);

Topic topic = Topic.builder()
.metadata(ObjectMeta.builder()
.name(TOPIC_NAME)
.build())
.spec(Topic.TopicSpec.builder()
.build())
.build();

topicAsyncExecutor.deleteTopic(topic);

verify(schemaRegistryClient, never()).deleteTag(any(), any(), any());
}

@Test
void shouldDeleteTopicAndTags() throws ExecutionException, InterruptedException, TimeoutException {
Properties properties = new Properties();
properties.put(CLUSTER_ID, CLUSTER_ID_TEST);

when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD);
when(deleteTopicsResult.all()).thenReturn(kafkaFuture);
when(adminClient.deleteTopics(anyList())).thenReturn(deleteTopicsResult);
when(managedClusterProperties.getAdminClient()).thenReturn(adminClient);
when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER);
when(managedClusterProperties.getConfig()).thenReturn(properties);
when(schemaRegistryClient.deleteTag(anyString(),
anyString(), anyString()))
.thenReturn(Mono.empty())
.thenReturn(Mono.error(new Exception("error")));

Topic topic = Topic.builder()
.metadata(ObjectMeta.builder()
.name(TOPIC_NAME)
.build())
.spec(Topic.TopicSpec.builder()
.tags(List.of(TAG1))
.build())
.build();

topicAsyncExecutor.deleteTopic(topic);

verify(schemaRegistryClient).deleteTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG1);
}

@Test
void shouldNotEnrichWithTagsWhenNotConfluentCloud() {
when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.SELF_MANAGED);

Map<String, Topic> brokerTopics = Map.of(TOPIC_NAME,
Topic.builder()
.metadata(ObjectMeta.builder()
.name(TOPIC_NAME)
.build())
.spec(Topic.TopicSpec.builder()
.build())
.build());

topicAsyncExecutor.enrichWithTags(brokerTopics);

assertTrue(brokerTopics.get(TOPIC_NAME).getSpec().getTags().isEmpty());
}

@Test
void shouldEnrichWithTagsWhenConfluentCloud() {
Properties properties = new Properties();
properties.put(CLUSTER_ID, CLUSTER_ID_TEST);

when(managedClusterProperties.getProvider()).thenReturn(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD);
when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER);
when(managedClusterProperties.getConfig()).thenReturn(properties);

TagTopicInfo tagTopicInfo = TagTopicInfo.builder()
.typeName("typeName")
.build();

when(schemaRegistryClient.getTopicWithTags(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME))
.thenReturn(Mono.just(List.of(tagTopicInfo)));

Map<String, Topic> brokerTopics = Map.of(TOPIC_NAME,
Topic.builder()
.metadata(ObjectMeta.builder()
.name(TOPIC_NAME)
.build())
.spec(Topic.TopicSpec.builder()
.build())
.build());

topicAsyncExecutor.enrichWithTags(brokerTopics);

assertEquals("typeName", brokerTopics.get(TOPIC_NAME).getSpec().getTags().get(0));
}
}

0 comments on commit 96aab50

Please sign in to comment.