diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java index b9a5268c..6371ea36 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java @@ -6,7 +6,9 @@ import java.util.Map; import java.util.Properties; +import lombok.AccessLevel; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -16,6 +18,7 @@ * The class to represent the context of the KStream. */ @Slf4j +@NoArgsConstructor(access = AccessLevel.PRIVATE) public class KafkaStreamsExecutionContext { /** @@ -36,6 +39,7 @@ public class KafkaStreamsExecutionContext { * The properties of the stream execution context. */ @Getter + @Setter private static Properties properties; /** @@ -51,9 +55,6 @@ public class KafkaStreamsExecutionContext { @Getter private static String prefix; - private KafkaStreamsExecutionContext() { - } - /** * Register KStream properties. * diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContextTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContextTest.java new file mode 100644 index 00000000..783ed928 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContextTest.java @@ -0,0 +1,59 @@ +package com.michelin.kstreamplify.context; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.Properties; +import org.apache.kafka.streams.StreamsConfig; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class KafkaStreamsExecutionContextTest { + + @BeforeEach + void setUp() { + KafkaStreamsExecutionContext.setProperties(null); + } + + @Test + void shouldNotRegisterPropertiesWhenNull() { + KafkaStreamsExecutionContext.registerProperties(null); + assertNull(KafkaStreamsExecutionContext.getProperties()); + } + + @Test + void shouldAddPrefixToAppId() { + Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); + properties.put("prefix.self", "abc."); + + KafkaStreamsExecutionContext.registerProperties(properties); + + assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix()); + assertEquals("abc.appId", KafkaStreamsExecutionContext.getProperties() + .get(StreamsConfig.APPLICATION_ID_CONFIG)); + } + + @Test + void shouldNotAddPrefixToAppIdIfNoPrefix() { + Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); + + KafkaStreamsExecutionContext.registerProperties(properties); + + assertEquals("", KafkaStreamsExecutionContext.getPrefix()); + assertEquals("appId", KafkaStreamsExecutionContext.getProperties() + .get(StreamsConfig.APPLICATION_ID_CONFIG)); + } + + @Test + void shouldNotAddPrefixToAppIdIfNotAppId() { + Properties properties = new Properties(); + properties.put("prefix.self", "abc."); + + KafkaStreamsExecutionContext.registerProperties(properties); + + assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix()); + assertNull(KafkaStreamsExecutionContext.getProperties().get(StreamsConfig.APPLICATION_ID_CONFIG)); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java index 2103a0a9..a80bd38f 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java @@ -34,7 +34,7 @@ class DedupWithPredicateProcessorTest { private TimestampedKeyValueStore store; @BeforeEach - public void setUp() { + void setUp() { // Create an instance of DedupWithPredicateProcessor for testing processor = new DedupWithPredicateProcessor<>("testStore", Duration.ofHours(1), TestKeyExtractor::extract); diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java index 64d68a0c..7c908973 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java @@ -11,12 +11,15 @@ import java.util.Properties; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.mockito.junit.jupiter.MockitoExtension; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.CompressionType; import org.rocksdb.Options; +@ExtendWith(MockitoExtension.class) class RocksDbConfigTest { @Mock @@ -24,7 +27,6 @@ class RocksDbConfigTest { @BeforeEach void setUp() { - MockitoAnnotations.openMocks(this); when(options.tableFormatConfig()).thenReturn(new BlockBasedTableConfig()); }