diff --git a/src/main/java/com/michelin/ns4kafka/controllers/topic/TopicController.java b/src/main/java/com/michelin/ns4kafka/controllers/topic/TopicController.java index 89d63d66..e529fe4f 100644 --- a/src/main/java/com/michelin/ns4kafka/controllers/topic/TopicController.java +++ b/src/main/java/com/michelin/ns4kafka/controllers/topic/TopicController.java @@ -108,6 +108,7 @@ public HttpResponse apply(String namespace, @Valid @Body Topic topic, validationErrors.addAll(topicService.validateTopicUpdate(ns, existingTopic.get(), topic)); } + topic.getSpec().getTags().replaceAll(String::toUpperCase); List existingTags = existingTopic .map(oldTopic -> oldTopic.getSpec().getTags()) .orElse(Collections.emptyList()); diff --git a/src/main/java/com/michelin/ns4kafka/services/TopicService.java b/src/main/java/com/michelin/ns4kafka/services/TopicService.java index fb8479a5..27480da2 100644 --- a/src/main/java/com/michelin/ns4kafka/services/TopicService.java +++ b/src/main/java/com/michelin/ns4kafka/services/TopicService.java @@ -45,9 +45,6 @@ public class TopicService { @Inject List managedClusterProperties; - @Inject - SchemaRegistryClient schemaRegistryClient; - /** * Find all topics. * @@ -348,28 +345,6 @@ public List validateTags(Namespace namespace, Topic topic) { return validationErrors; } - Set tagNames = schemaRegistryClient.getTags(namespace.getMetadata().getCluster()) - .map(tags -> tags.stream().map(TagInfo::name).collect(Collectors.toSet())).block(); - - if (tagNames == null || tagNames.isEmpty()) { - validationErrors.add(String.format( - "Invalid value %s for tags: No tags allowed.", - String.join(", ", topic.getSpec().getTags()))); - return validationErrors; - } - - List unavailableTagNames = topic.getSpec().getTags() - .stream() - .filter(tagName -> !tagNames.contains(tagName)) - .toList(); - - if (!unavailableTagNames.isEmpty()) { - validationErrors.add(String.format( - "Invalid value %s for tags: Available tags are %s.", - String.join(", ", unavailableTagNames), - String.join(", ", tagNames))); - } - return validationErrors; } } diff --git a/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java b/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java index dcb2c7a5..114f5ff3 100644 --- a/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java +++ b/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java @@ -186,9 +186,9 @@ public Mono deleteCurrentCompatibilityBySubject(Str public Mono> getTags(String kafkaCluster) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); HttpRequest request = HttpRequest - .GET(URI.create(StringUtils.prependUri( - config.getUrl(), "/catalog/v1/types/tagdefs"))) - .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + .GET(URI.create(StringUtils.prependUri( + config.getUrl(), "/catalog/v1/types/tagdefs"))) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class))); } @@ -215,7 +215,7 @@ public Mono getTopicWithTags(String kafkaCluster) { * @param tagSpecs Tags to add * @return Information about added tags */ - public Mono> addTags(String kafkaCluster, List tagSpecs) { + public Mono> associateTags(String kafkaCluster, List tagSpecs) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); HttpRequest request = HttpRequest .POST(URI.create(StringUtils.prependUri( @@ -225,6 +225,21 @@ public Mono> addTags(String kafkaCluster, List return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class))); } + /** + * Create tags. + * + * @param tags The list of tags to create + * @param kafkaCluster The Kafka cluster + * @return Information about created tags + */ + public Mono> createTags(List tags, String kafkaCluster) { + ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); + HttpRequest request = HttpRequest.POST(URI.create(StringUtils.prependUri( + config.getUrl(), "/catalog/v1/types/tagdefs")), tags) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class))); + } + /** * Delete a tag to a topic. * @@ -233,7 +248,7 @@ public Mono> addTags(String kafkaCluster, List * @param tagName The tag to delete * @return The resume response */ - public Mono> deleteTag(String kafkaCluster, String entityName, String tagName) { + public Mono> dissociateTag(String kafkaCluster, String entityName, String tagName) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); HttpRequest request = HttpRequest .DELETE(URI.create(StringUtils.prependUri( diff --git a/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java index 3a0ba80d..442e6d1e 100644 --- a/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java @@ -8,6 +8,7 @@ import com.michelin.ns4kafka.services.clients.schema.SchemaRegistryClient; import com.michelin.ns4kafka.services.clients.schema.entities.TagEntities; import com.michelin.ns4kafka.services.clients.schema.entities.TagEntity; +import com.michelin.ns4kafka.services.clients.schema.entities.TagInfo; import com.michelin.ns4kafka.services.clients.schema.entities.TagTopicInfo; import io.micronaut.context.annotation.EachBean; import jakarta.inject.Singleton; @@ -154,7 +155,7 @@ public void alterTags(List ns4kafkaTopics, Map brokerTopic // Get tags to delete Set existingTags = new HashSet<>(brokerTopic.getSpec().getTags()); existingTags.removeAll(Set.copyOf(topic.getSpec().getTags())); - deleteTags(existingTags, topic.getMetadata().getName()); + dissociateTags(existingTags, topic.getMetadata().getName()); // Get tags to create Set newTags = new HashSet<>(topic.getSpec().getTags()); @@ -173,7 +174,7 @@ public void alterTags(List ns4kafkaTopics, Map brokerTopic .toList(); if (!tagsToCreate.isEmpty()) { - createTags(tagsToCreate); + createAndAssociateTags(tagsToCreate); } } @@ -191,7 +192,7 @@ public void deleteTopic(Topic topic) throws InterruptedException, ExecutionExcep managedClusterProperties.getName()); if (isConfluentCloud() && !topic.getSpec().getTags().isEmpty()) { - deleteTags(topic.getSpec().getTags(), topic.getMetadata().getName()); + dissociateTags(topic.getSpec().getTags(), topic.getMetadata().getName()); } } @@ -378,37 +379,54 @@ private void createTopics(List topics) { } /** - * Create tags. + * Create tags and associate them. * - * @param tagsToCreate The tags to create + * @param tagsToAssociate The tags to create and associate */ - private void createTags(List tagsToCreate) { - String stringTags = String.join(", ", tagsToCreate - .stream() - .map(Record::toString) - .toList()); + private void createAndAssociateTags(List tagsToAssociate) { + List tagsToCreate = tagsToAssociate + .stream() + .map(tag -> TagInfo + .builder() + .name(tag.typeName()) + .build()) + .toList(); - schemaRegistryClient.addTags(managedClusterProperties.getName(), tagsToCreate) - .subscribe(success -> log.info(String.format("Success creating tag %s.", stringTags)), - error -> log.error(String.format("Error creating tag %s.", stringTags), error)); + String stringTags = String.join(", ", tagsToAssociate + .stream() + .map(Record::toString) + .toList()); + + schemaRegistryClient.createTags(tagsToCreate, managedClusterProperties.getName()) + .subscribe( + successCreation -> + schemaRegistryClient.associateTags( + managedClusterProperties.getName(), + tagsToAssociate) + .subscribe( + successAssociation -> + log.info(String.format("Success associating tag %s.", stringTags)), + error -> + log.error(String.format("Error associating tag %s.", stringTags), error)), + error -> log.error(String.format("Error creating tag %s.", stringTags), error)); } /** - * Delete tags. + * Dissociate tags to a topic. * - * @param tagsToDelete The tags to delete - * @param topicName The topic name + * @param tagsToDissociate The tags to dissociate + * @param topicName The topic name */ - private void deleteTags(Collection tagsToDelete, String topicName) { - tagsToDelete - .forEach(tag -> schemaRegistryClient.deleteTag(managedClusterProperties.getName(), + private void dissociateTags(Collection tagsToDissociate, String topicName) { + tagsToDissociate + .forEach(tag -> schemaRegistryClient.dissociateTag(managedClusterProperties.getName(), managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + topicName, tag) - .subscribe(success -> log.info(String.format("Success deleting tag %s.", + .subscribe(success -> log.info(String.format("Success dissociating tag %s.", managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + topicName + "/" + tag)), - error -> log.error(String.format("Error deleting tag %s.", + error -> log.error(String.format("Error dissociating tag %s.", managedClusterProperties.getConfig().getProperty(CLUSTER_ID) + ":" + topicName + "/" + tag), error))); diff --git a/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java b/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java index 3854ce35..4647e989 100644 --- a/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java @@ -434,7 +434,7 @@ void shouldNotValidateTagsWhenNoNewTag() throws InterruptedException, ExecutionE .name("test.topic") .build()) .spec(Topic.TopicSpec.builder() - .tags(List.of("TAG1")) + .tags(Arrays.asList("TAG1")) .replicationFactor(3) .partitions(3) .configs(Map.of("cleanup.policy", "delete", diff --git a/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java b/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java index fa277b11..6ee95d6a 100644 --- a/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/TopicServiceTest.java @@ -14,7 +14,6 @@ import com.michelin.ns4kafka.properties.ManagedClusterProperties; import com.michelin.ns4kafka.repositories.TopicRepository; import com.michelin.ns4kafka.services.clients.schema.SchemaRegistryClient; -import com.michelin.ns4kafka.services.clients.schema.entities.TagInfo; import com.michelin.ns4kafka.services.executors.TopicAsyncExecutor; import io.micronaut.context.ApplicationContext; import io.micronaut.inject.qualifiers.Qualifiers; @@ -23,6 +22,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.stream.Stream; @@ -33,7 +33,6 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import reactor.core.publisher.Mono; @ExtendWith(MockitoExtension.class) class TopicServiceTest { @@ -886,26 +885,20 @@ void findAll() { @Test void shouldTagsBeValid() { - Namespace ns = Namespace.builder() - .metadata(ObjectMeta.builder() - .name("namespace") - .cluster("local") - .build()) - .build(); - - Topic topic = Topic.builder() - .metadata(ObjectMeta.builder().name("ns-topic1").build()) - .spec(Topic.TopicSpec.builder() - .tags(List.of("TAG_TEST")).build()) - .build(); - - List tagInfo = List.of(TagInfo.builder().name("TAG_TEST").build()); - - when(managedClusterProperties.stream()).thenReturn(Stream.of( - new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD))); - when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(tagInfo)); - - List validationErrors = topicService.validateTags(ns, topic); + ManagedClusterProperties managedClusterProps = + new ManagedClusterProperties("local", + ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD); + Properties properties = new Properties(); + managedClusterProps.setConfig(properties); + + when(managedClusterProperties.stream()).thenReturn(Stream.of(managedClusterProps)); + + List validationErrors = topicService.validateTags( + Namespace.builder().metadata( + ObjectMeta.builder().name("namespace").cluster("local").build()).build(), + Topic.builder().metadata( + ObjectMeta.builder().name("ns-topic1").build()).spec(Topic.TopicSpec.builder() + .tags(List.of("TAG_TEST")).build()).build()); assertEquals(0, validationErrors.size()); } @@ -931,58 +924,4 @@ void shouldTagsBeInvalidWhenNotConfluentCloud() { assertEquals(1, validationErrors.size()); assertEquals("Invalid value TAG_TEST for tags: Tags are not currently supported.", validationErrors.get(0)); } - - @Test - void shouldTagsBeInvalidWhenNoTagsAllowed() { - Namespace ns = Namespace.builder() - .metadata(ObjectMeta.builder() - .name("namespace") - .cluster("local") - .build()) - .build(); - - Topic topic = Topic.builder() - .metadata(ObjectMeta.builder().name("ns-topic1").build()) - .spec(Topic.TopicSpec.builder() - .tags(List.of("TAG_TEST")).build()) - .build(); - - when(managedClusterProperties.stream()).thenReturn(Stream.of( - new ManagedClusterProperties("local", ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD))); - when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(Collections.emptyList())); - - List validationErrors = topicService.validateTags(ns, topic); - assertEquals(1, validationErrors.size()); - assertEquals( - "Invalid value TAG_TEST for tags: No tags allowed.", - validationErrors.get(0)); - } - - @Test - void shouldTagsBeInvalidWhenNotAllowed() { - Namespace ns = Namespace.builder() - .metadata(ObjectMeta.builder() - .name("namespace") - .cluster("local") - .build()) - .build(); - - Topic topic = Topic.builder() - .metadata(ObjectMeta.builder().name("ns-topic1").build()) - .spec(Topic.TopicSpec.builder() - .tags(List.of("BAD_TAG", "TAG_TEST")).build()) - .build(); - - List tagInfo = List.of(TagInfo.builder().name("TAG_TEST").build()); - - when(managedClusterProperties.stream()) - .thenReturn(Stream.of( - new ManagedClusterProperties("local", - ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD))); - when(schemaRegistryClient.getTags("local")).thenReturn(Mono.just(tagInfo)); - - List validationErrors = topicService.validateTags(ns, topic); - assertEquals(1, validationErrors.size()); - assertEquals("Invalid value BAD_TAG for tags: Available tags are TAG_TEST.", validationErrors.get(0)); - } } diff --git a/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java b/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java index aa5e26a2..86313c73 100644 --- a/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutorTest.java @@ -20,6 +20,7 @@ import com.michelin.ns4kafka.services.clients.schema.entities.TagEntities; import com.michelin.ns4kafka.services.clients.schema.entities.TagEntity; import io.micronaut.http.HttpResponse; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Properties; @@ -67,7 +68,7 @@ void shouldDeleteTagsAndNotCreateIfEmpty() { Properties properties = new Properties(); properties.put(CLUSTER_ID, CLUSTER_ID_TEST); - when(schemaRegistryClient.deleteTag(anyString(), + when(schemaRegistryClient.dissociateTag(anyString(), anyString(), anyString())) .thenReturn(Mono.empty()) .thenReturn(Mono.error(new Exception("error"))); @@ -96,8 +97,8 @@ void shouldDeleteTagsAndNotCreateIfEmpty() { topicAsyncExecutor.alterTags(ns4kafkaTopics, brokerTopics); - verify(schemaRegistryClient).deleteTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG2); - verify(schemaRegistryClient).deleteTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG3); + verify(schemaRegistryClient).dissociateTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG2); + verify(schemaRegistryClient).dissociateTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG3); } @Test @@ -105,37 +106,114 @@ void shouldCreateTags() { Properties properties = new Properties(); properties.put(CLUSTER_ID, CLUSTER_ID_TEST); - when(schemaRegistryClient.addTags(anyString(), anyList())) - .thenReturn(Mono.empty()) - .thenReturn(Mono.error(new Exception("error"))); + when(schemaRegistryClient.associateTags(anyString(), anyList())) + .thenReturn(Mono.just(List.of())); + when(schemaRegistryClient.createTags(anyList(), anyString())) + .thenReturn(Mono.just(List.of())); when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); when(managedClusterProperties.getConfig()).thenReturn(properties); List ns4kafkaTopics = List.of( - Topic.builder() - .metadata(ObjectMeta.builder() - .name(TOPIC_NAME) - .build()) - .spec(Topic.TopicSpec.builder() - .tags(List.of(TAG1)) - .build()) - .build()); + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of(TAG1)) + .build()) + .build()); Map brokerTopics = Map.of(TOPIC_NAME, - Topic.builder() - .metadata(ObjectMeta.builder() - .name(TOPIC_NAME) - .build()) - .spec(Topic.TopicSpec.builder() - .build()) - .build()); + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build()); + + topicAsyncExecutor.alterTags(ns4kafkaTopics, brokerTopics); + + verify(schemaRegistryClient).associateTags(eq(LOCAL_CLUSTER), argThat(tags -> + tags.get(0).entityName().equals(CLUSTER_ID_TEST + ":" + TOPIC_NAME) + && tags.get(0).typeName().equals(TAG1) + && tags.get(0).entityType().equals(TOPIC_ENTITY_TYPE))); + } + + @Test + void shouldCreateTagsButNotAssociateThem() { + Properties properties = new Properties(); + properties.put(CLUSTER_ID, CLUSTER_ID_TEST); + + when(schemaRegistryClient.associateTags(anyString(), anyList())) + .thenReturn(Mono.error(new IOException())); + when(schemaRegistryClient.createTags(anyList(), anyString())) + .thenReturn(Mono.just(List.of())); + when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); + when(managedClusterProperties.getConfig()).thenReturn(properties); + + List ns4kafkaTopics = List.of( + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of(TAG1)) + .build()) + .build()); + + Map brokerTopics = Map.of(TOPIC_NAME, + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build()); + + topicAsyncExecutor.alterTags(ns4kafkaTopics, brokerTopics); + + verify(schemaRegistryClient).associateTags(eq(LOCAL_CLUSTER), argThat(tags -> + tags.get(0).entityName().equals(CLUSTER_ID_TEST + ":" + TOPIC_NAME) + && tags.get(0).typeName().equals(TAG1) + && tags.get(0).entityType().equals(TOPIC_ENTITY_TYPE))); + } + + @Test + void shouldNotAssociateTagsWhenCreationFails() { + Properties properties = new Properties(); + properties.put(CLUSTER_ID, CLUSTER_ID_TEST); + + when(schemaRegistryClient.createTags(anyList(), anyString())) + .thenReturn(Mono.error(new IOException())); + when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); + when(managedClusterProperties.getConfig()).thenReturn(properties); + + List ns4kafkaTopics = List.of( + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .tags(List.of(TAG1)) + .build()) + .build()); + + Map brokerTopics = Map.of(TOPIC_NAME, + Topic.builder() + .metadata(ObjectMeta.builder() + .name(TOPIC_NAME) + .build()) + .spec(Topic.TopicSpec.builder() + .build()) + .build()); topicAsyncExecutor.alterTags(ns4kafkaTopics, brokerTopics); - verify(schemaRegistryClient).addTags(eq(LOCAL_CLUSTER), argThat(tags -> - tags.get(0).entityName().equals(CLUSTER_ID_TEST + ":" + TOPIC_NAME) - && tags.get(0).typeName().equals(TAG1) - && tags.get(0).entityType().equals(TOPIC_ENTITY_TYPE))); + verify(schemaRegistryClient, never()).associateTags(eq(LOCAL_CLUSTER), argThat(tags -> + tags.get(0).entityName().equals(CLUSTER_ID_TEST + ":" + TOPIC_NAME) + && tags.get(0).typeName().equals(TAG1) + && tags.get(0).entityType().equals(TOPIC_ENTITY_TYPE))); } @Test @@ -161,7 +239,7 @@ void shouldDeleteTopicNoTags() throws ExecutionException, InterruptedException, topicAsyncExecutor.deleteTopic(topic); - verify(schemaRegistryClient, never()).deleteTag(any(), any(), any()); + verify(schemaRegistryClient, never()).dissociateTag(any(), any(), any()); } @Test @@ -181,7 +259,7 @@ void shouldDeleteTopicSelfManagedCluster() throws ExecutionException, Interrupte topicAsyncExecutor.deleteTopic(topic); - verify(schemaRegistryClient, never()).deleteTag(any(), any(), any()); + verify(schemaRegistryClient, never()).dissociateTag(any(), any(), any()); } @Test @@ -195,7 +273,7 @@ void shouldDeleteTopicAndTags() throws ExecutionException, InterruptedException, when(managedClusterProperties.getAdminClient()).thenReturn(adminClient); when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER); when(managedClusterProperties.getConfig()).thenReturn(properties); - when(schemaRegistryClient.deleteTag(anyString(), + when(schemaRegistryClient.dissociateTag(anyString(), anyString(), anyString())) .thenReturn(Mono.just(HttpResponse.ok())) .thenReturn(Mono.error(new Exception("error"))); @@ -211,7 +289,7 @@ void shouldDeleteTopicAndTags() throws ExecutionException, InterruptedException, topicAsyncExecutor.deleteTopic(topic); - verify(schemaRegistryClient).deleteTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG1); + verify(schemaRegistryClient).dissociateTag(LOCAL_CLUSTER, CLUSTER_ID_TEST + ":" + TOPIC_NAME, TAG1); } @Test