Skip to content

Commit

Permalink
Fix topic with Confluent tag being recreated after deletion (#400)
Browse files Browse the repository at this point in the history
* commit test

* replace get(0) by getFirst()

---------

Co-authored-by: thcai <[email protected]>
  • Loading branch information
ThomasCAI-mlv and ThomasCAI-mlv authored Jun 20, 2024
1 parent e093530 commit 64bf07f
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,6 @@ public void deleteTopic(Topic topic) throws InterruptedException, ExecutionExcep

log.info("Success deleting topic {} on {}", topic.getMetadata().getName(),
managedClusterProperties.getName());

if (managedClusterProperties.isConfluentCloud() && !topic.getSpec().getTags().isEmpty()) {
dissociateTags(topic.getSpec().getTags(), topic);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ void shouldCreateAndAssociateTags() {

@Test
void shouldDeleteTopicNoTags() throws ExecutionException, InterruptedException, TimeoutException {
when(managedClusterProperties.isConfluentCloud()).thenReturn(true);
when(deleteTopicsResult.all()).thenReturn(kafkaFuture);
when(adminClient.deleteTopics(anyList())).thenReturn(deleteTopicsResult);
when(managedClusterProperties.getAdminClient()).thenReturn(adminClient);
Expand All @@ -293,44 +292,15 @@ void shouldDeleteTopicNoTags() throws ExecutionException, InterruptedException,

topicAsyncExecutor.deleteTopic(topic);

verify(schemaRegistryClient, never()).dissociateTag(any(), any(), any());
}

@Test
void shouldDeleteTopicSelfManagedCluster() throws ExecutionException, InterruptedException, TimeoutException {
when(managedClusterProperties.isConfluentCloud()).thenReturn(false);
when(deleteTopicsResult.all()).thenReturn(kafkaFuture);
when(adminClient.deleteTopics(anyList())).thenReturn(deleteTopicsResult);
when(managedClusterProperties.getAdminClient()).thenReturn(adminClient);

Topic topic = Topic.builder()
.metadata(Metadata.builder()
.name(TOPIC_NAME)
.build())
.spec(Topic.TopicSpec.builder()
.build())
.build();

topicAsyncExecutor.deleteTopic(topic);

verify(schemaRegistryClient, never()).dissociateTag(any(), any(), any());
verify(adminClient).deleteTopics(anyList());
}

@Test
void shouldDeleteTopicAndTags() throws ExecutionException, InterruptedException, TimeoutException {
Properties properties = new Properties();
properties.put(CLUSTER_ID, CLUSTER_ID_TEST);

when(managedClusterProperties.isConfluentCloud()).thenReturn(true);
when(deleteTopicsResult.all()).thenReturn(kafkaFuture);
when(adminClient.deleteTopics(anyList())).thenReturn(deleteTopicsResult);
when(managedClusterProperties.getAdminClient()).thenReturn(adminClient);
when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER);
when(managedClusterProperties.getConfig()).thenReturn(properties);
when(schemaRegistryClient.dissociateTag(anyString(),
anyString(), anyString()))
.thenReturn(Mono.just(HttpResponse.ok()))
.thenReturn(Mono.error(new Exception("error")));

Topic topic = Topic.builder()
.metadata(Metadata.builder()
Expand All @@ -343,7 +313,7 @@ void shouldDeleteTopicAndTags() throws ExecutionException, InterruptedException,

topicAsyncExecutor.deleteTopic(topic);

verify(schemaRegistryClient).dissociateTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG1);
verify(adminClient).deleteTopics(anyList());
}

@Test
Expand Down

0 comments on commit 64bf07f

Please sign in to comment.