Skip to content

Commit

Permalink
Manage Confluent tags
Browse files Browse the repository at this point in the history
  • Loading branch information
adriencalime committed Oct 3, 2023
1 parent a7577ea commit 2888436
Showing 1 changed file with 124 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,130 @@ void updateTopic() throws InterruptedException, ExecutionException, TimeoutExcep
assertEquals("test.topic", actual.getMetadata().getName());
}

/**
* Validate topic update with two new tags
* @throws InterruptedException Any interrupted exception
* @throws ExecutionException Any execution exception
* @throws TimeoutException Any timeout exception
*/
@Test
void updateTopicWithNewTags() throws InterruptedException, ExecutionException, TimeoutException {
Namespace ns = Namespace.builder()
.metadata(ObjectMeta.builder()
.name("test")
.cluster("local")
.build())
.spec(NamespaceSpec.builder()
.topicValidator(TopicValidator.makeDefault())
.build())
.build();

Topic existing = Topic.builder()
.metadata(ObjectMeta.builder()
.name("test.topic")
.build())
.spec(Topic.TopicSpec.builder()
.replicationFactor(3)
.partitions(3)
.configs(Map.of("cleanup.policy","compact",
"min.insync.replicas", "2",
"retention.ms", "60000"))
.build())
.build();

Topic topic = Topic.builder()
.metadata(ObjectMeta.builder()
.name("test.topic")
.build())
.spec(Topic.TopicSpec.builder()
.tags(Arrays.asList("TAG1", "TAG2"))
.replicationFactor(3)
.partitions(3)
.configs(Map.of("cleanup.policy","delete",
"min.insync.replicas", "2",
"retention.ms", "60000"))
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(topicService.findByName(ns, "test.topic")).thenReturn(Optional.of(existing));
when(topicService.create(topic)).thenReturn(topic);
when(securityService.username()).thenReturn(Optional.of("test-user"));
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false);
doNothing().when(applicationEventPublisher).publishEvent(any());

var response = topicController.apply("test", topic, false);
Topic actual = response.body();
assertEquals("changed", response.header("X-Ns4kafka-Result"));
assertEquals("test.topic", actual.getMetadata().getName());
assertEquals(2, actual.getSpec().getTags().size());
assertEquals("TAG1", actual.getSpec().getTags().get(0));
assertEquals("TAG2", actual.getSpec().getTags().get(1));
}

/**
* Validate topic update with a tag to delete
* @throws InterruptedException Any interrupted exception
* @throws ExecutionException Any execution exception
* @throws TimeoutException Any timeout exception
*/
@Test
void updateTopicWithTagToDelete() throws InterruptedException, ExecutionException, TimeoutException {
Namespace ns = Namespace.builder()
.metadata(ObjectMeta.builder()
.name("test")
.cluster("local")
.build())
.spec(NamespaceSpec.builder()
.topicValidator(TopicValidator.makeDefault())
.build())
.build();

Topic existing = Topic.builder()
.metadata(ObjectMeta.builder()
.name("test.topic")
.build())
.spec(Topic.TopicSpec.builder()
.tags(Arrays.asList("TAG1", "TAG2"))
.replicationFactor(3)
.partitions(3)
.configs(Map.of("cleanup.policy","compact",
"min.insync.replicas", "2",
"retention.ms", "60000"))
.build())
.build();

Topic topic = Topic.builder()
.metadata(ObjectMeta.builder()
.name("test.topic")
.build())
.spec(Topic.TopicSpec.builder()
.tags(List.of("TAG1"))
.replicationFactor(3)
.partitions(3)
.configs(Map.of("cleanup.policy","delete",
"min.insync.replicas", "2",
"retention.ms", "60000"))
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(topicService.findByName(ns, "test.topic")).thenReturn(Optional.of(existing));
when(topicService.create(topic)).thenReturn(topic);
when(securityService.username()).thenReturn(Optional.of("test-user"));
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false);
doNothing().when(applicationEventPublisher).publishEvent(any());

var response = topicController.apply("test", topic, false);
Topic actual = response.body();
assertEquals("changed", response.header("X-Ns4kafka-Result"));
assertEquals("test.topic", actual.getMetadata().getName());
assertEquals(1, actual.getSpec().getTags().size());
assertEquals("TAG1", actual.getSpec().getTags().get(0));
}

/**
* Validate topic update when there are validations errors
*/
Expand Down

0 comments on commit 2888436

Please sign in to comment.