diff --git a/.checkstyle/checkstyle.xml b/.checkstyle/checkstyle.xml new file mode 100644 index 00000000..32c0f31e --- /dev/null +++ b/.checkstyle/checkstyle.xml @@ -0,0 +1,382 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.github/workflows/on_pull_request.yml b/.github/workflows/on_pull_request.yml index 9cd5c772..32a5d5df 100644 --- a/.github/workflows/on_pull_request.yml +++ b/.github/workflows/on_pull_request.yml @@ -20,6 +20,9 @@ jobs: distribution: 'temurin' cache: maven + - name: Check Style + run: mvn checkstyle:check + - name: Build run: mvn clean compile diff --git a/.github/workflows/on_push_main.yml b/.github/workflows/on_push_main.yml index accf452e..885d56a8 100644 --- a/.github/workflows/on_push_main.yml +++ b/.github/workflows/on_push_main.yml @@ -25,6 +25,9 @@ jobs: gpg-private-key: ${{ secrets.GPG_PRIVATE_KEY }} gpg-passphrase: MAVEN_GPG_PASSPHRASE + - name: Check Style + run: mvn checkstyle:check + - name: Build id: build run: | diff --git a/.readme/contributing/check_style.png b/.readme/contributing/check_style.png new file mode 100644 index 00000000..9db02b4c Binary files /dev/null and b/.readme/contributing/check_style.png differ diff --git a/.readme/contributing/reformat_code.png b/.readme/contributing/reformat_code.png new file mode 100644 index 00000000..31bf239b Binary files /dev/null and b/.readme/contributing/reformat_code.png differ diff --git a/.readme/contributing/save_actions.png b/.readme/contributing/save_actions.png new file mode 100644 index 00000000..cfbb7a63 Binary files /dev/null and b/.readme/contributing/save_actions.png differ diff --git a/.readme/contributing/scan.png b/.readme/contributing/scan.png new file mode 100644 index 00000000..c099146d Binary files /dev/null and b/.readme/contributing/scan.png differ diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 96c59105..b997a970 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -35,7 +35,32 @@ In general, we follow the ["fork-and-pull" Git workflow](https://github.com/susa - Push changes to your fork - Open a PR in our repository targeting master and follow the PR template so that we can efficiently review the changes. -## Styleguides +## Style Guide + +### Code Style + +We maintain a consistent code style using Checkstyle. + +#### IntelliJ + +To ensure code style consistency in IntelliJ, follow these steps: + +1. Install the [CheckStyle-IDEA plugin](https://plugins.jetbrains.com/plugin/1065-checkstyle-idea). +2. Create a new CheckStyle configuration for Kstreamplify based on the code style configuration located in the `.checkstyle` folder. Configure it as follows: + +![check_style.png](.readme%2Fcontributing%2Fcheck_style.png) + +3. Enable the "Reformat code" and "Optimize imports" options in the save actions: + +![save_actions.png](.readme%2Fcontributing%2Fsave_actions.png) + +4. Reformat your code with the Checkstyle configuration: + +![reformat_code.png](.readme%2Fcontributing%2Freformat_code.png) + +5. Before committing your changes, ensure your contribution doesn't introduce any problems by running a scan: + +![scan.png](.readme%2Fcontributing%2Fscan.png) ### Git Commit Messages diff --git a/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java b/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java index 1496632f..51ab78d2 100644 --- a/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java +++ b/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java @@ -7,6 +7,12 @@ import com.michelin.kstreamplify.utils.TopicWithSerde; import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.time.Instant; +import java.util.Collections; +import java.util.Properties; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -16,32 +22,25 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.time.Instant; -import java.util.Collections; -import java.util.Properties; - /** - *

The main test class to extend to execute unit tests on topology

+ *

The main test class to extend to execute unit tests on topology

. *

It provides a {@link TopologyTestDriver} and a {@link TestOutputTopic} for the DLQ

*/ public abstract class KafkaStreamsStarterTest { private static final String STATE_DIR = "/tmp/kafka-streams/"; /** - * The topology test driver + * The topology test driver. */ protected TopologyTestDriver testDriver; /** - * The dlq topic, initialized in {@link #generalSetUp()} + * The dlq topic, initialized in {@link #generalSetUp()}. */ protected TestOutputTopic dlqTopic; /** - * Set up topology test driver + * Set up topology test driver. */ @BeforeEach void generalSetUp() { @@ -52,7 +51,8 @@ void generalSetUp() { KafkaStreamsExecutionContext.registerProperties(properties); KafkaStreamsExecutionContext.setSerdesConfig(Collections - .singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://" + getClass().getName())); + .singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + "mock://" + getClass().getName())); var starter = getKafkaStreamsStarter(); @@ -61,20 +61,22 @@ void generalSetUp() { StreamsBuilder streamsBuilder = new StreamsBuilder(); starter.topology(streamsBuilder); - testDriver = new TopologyTestDriver(streamsBuilder.build(), properties, getInitialWallClockTime()); + testDriver = + new TopologyTestDriver(streamsBuilder.build(), properties, getInitialWallClockTime()); - dlqTopic = testDriver.createOutputTopic(KafkaStreamsExecutionContext.getDlqTopicName(), new StringDeserializer(), SerdesUtils.getSerdesForValue().deserializer()); + dlqTopic = testDriver.createOutputTopic(KafkaStreamsExecutionContext.getDlqTopicName(), + new StringDeserializer(), SerdesUtils.getSerdesForValue().deserializer()); } /** - * Method to override to provide the KafkaStreamsStarter to test + * Method to override to provide the KafkaStreamsStarter to test. * * @return The KafkaStreamsStarter to test */ protected abstract KafkaStreamsStarter getKafkaStreamsStarter(); /** - * Default base wall clock time for topology test driver + * Default base wall clock time for topology test driver. * * @return The default wall clock time as instant */ @@ -83,7 +85,7 @@ protected Instant getInitialWallClockTime() { } /** - * Method to close everything properly at the end of the test + * Method to close everything properly at the end of the test. */ @AfterEach void generalTearDown() throws IOException { @@ -93,26 +95,31 @@ void generalTearDown() throws IOException { } /** - * Creates an input test topic on the testDriver using the provided topicWithSerde + * Creates an input test topic on the testDriver using the provided topicWithSerde. * * @param topicWithSerde The topic with serde used to crete the test topic * @param The serializable type of the key * @param The serializable type of the value * @return The corresponding TestInputTopic */ - protected TestInputTopic createInputTestTopic(TopicWithSerde topicWithSerde) { - return this.testDriver.createInputTopic(topicWithSerde.getUnPrefixedName(), topicWithSerde.getKeySerde().serializer(), topicWithSerde.getValueSerde().serializer()); + protected TestInputTopic createInputTestTopic( + TopicWithSerde topicWithSerde) { + return this.testDriver.createInputTopic(topicWithSerde.getUnPrefixedName(), + topicWithSerde.getKeySerde().serializer(), topicWithSerde.getValueSerde().serializer()); } /** - * Creates an output test topic on the testDriver using the provided topicWithSerde + * Creates an output test topic on the testDriver using the provided topicWithSerde. * * @param topicWithSerde The topic with serde used to crete the test topic * @param The serializable type of the key * @param The serializable type of the value * @return The corresponding TestOutputTopic */ - protected TestOutputTopic createOutputTestTopic(TopicWithSerde topicWithSerde) { - return this.testDriver.createOutputTopic(topicWithSerde.getUnPrefixedName(), topicWithSerde.getKeySerde().deserializer(), topicWithSerde.getValueSerde().deserializer()); + protected TestOutputTopic createOutputTestTopic( + TopicWithSerde topicWithSerde) { + return this.testDriver.createOutputTopic(topicWithSerde.getUnPrefixedName(), + topicWithSerde.getKeySerde().deserializer(), + topicWithSerde.getValueSerde().deserializer()); } } diff --git a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java index db52f428..c30c20b1 100644 --- a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java +++ b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java @@ -1,10 +1,13 @@ package com.michelin.kstreamplify; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.michelin.kstreamplify.avro.KafkaError; import com.michelin.kstreamplify.error.ProcessingResult; import com.michelin.kstreamplify.error.TopologyErrorHandler; import com.michelin.kstreamplify.initializer.KafkaStreamsStarter; import com.michelin.kstreamplify.utils.SerdesUtils; +import java.util.List; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -17,20 +20,17 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; - class TopologyErrorHandlerTest extends KafkaStreamsStarterTest { - private final String AVRO_TOPIC = "avroTopic"; + private static final String AVRO_TOPIC = "avroTopic"; + private static final String STRING_TOPIC = "stringTopic"; + private static final String OUTPUT_AVRO_TOPIC = "outputAvroTopic"; + private static final String OUTPUT_STRING_TOPIC = "outputStringTopic"; + private static final String DLQ_TOPIC = "dlqTopic"; + private TestInputTopic avroInputTopic; - private final String STRING_TOPIC = "stringTopic"; private TestInputTopic stringInputTopic; - private final String OUTPUT_AVRO_TOPIC = "outputAvroTopic"; private TestOutputTopic avroOutputTopic; - private final String OUTPUT_STRING_TOPIC = "outputStringTopic"; private TestOutputTopic stringOutputTopic; - private final String DLQ_TOPIC = "dlqTopic"; private TestOutputTopic dlqTopic; @Override @@ -44,33 +44,44 @@ public String dlqTopic() { @Override public void topology(StreamsBuilder streamsBuilder) { KStream> stringStream = streamsBuilder - .stream(STRING_TOPIC, Consumed.with(Serdes.String(), Serdes.String())) - .mapValues(value -> "error".equals(value) ? - ProcessingResult.fail(new NullPointerException(), value) : ProcessingResult.success(value)); + .stream(STRING_TOPIC, Consumed.with(Serdes.String(), Serdes.String())) + .mapValues(value -> "error".equals(value) + ? ProcessingResult.fail(new NullPointerException(), value) : + ProcessingResult.success(value)); TopologyErrorHandler.catchErrors(stringStream) - .to(OUTPUT_STRING_TOPIC, Produced.with(Serdes.String(), Serdes.String())); + .to(OUTPUT_STRING_TOPIC, Produced.with(Serdes.String(), Serdes.String())); - KStream> avroStream = streamsBuilder - .stream(AVRO_TOPIC, Consumed.with(Serdes.String(), SerdesUtils.getSerdesForValue())) - .mapValues(value -> value == null ? - ProcessingResult.fail(new NullPointerException(), null) : ProcessingResult.success(value)); + KStream> avroStream = + streamsBuilder + .stream(AVRO_TOPIC, Consumed.with(Serdes.String(), + SerdesUtils.getSerdesForValue())) + .mapValues(value -> value == null + ? ProcessingResult.fail(new NullPointerException(), null) : + ProcessingResult.success(value)); TopologyErrorHandler.catchErrors(avroStream) - .to(OUTPUT_AVRO_TOPIC, Produced.with(Serdes.String(), SerdesUtils.getSerdesForValue())); + .to(OUTPUT_AVRO_TOPIC, + Produced.with(Serdes.String(), SerdesUtils.getSerdesForValue())); } }; } @BeforeEach void setUp() { - stringInputTopic = testDriver.createInputTopic(STRING_TOPIC, new StringSerializer(), new StringSerializer()); - avroInputTopic = testDriver.createInputTopic(AVRO_TOPIC, new StringSerializer(), SerdesUtils.getSerdesForValue().serializer()); - - stringOutputTopic = testDriver.createOutputTopic(OUTPUT_STRING_TOPIC, new StringDeserializer(), new StringDeserializer()); - avroOutputTopic = testDriver.createOutputTopic(OUTPUT_AVRO_TOPIC, new StringDeserializer(), SerdesUtils.getSerdesForValue().deserializer()); - - dlqTopic = testDriver.createOutputTopic(DLQ_TOPIC, new StringDeserializer(), SerdesUtils.getSerdesForValue().deserializer()); + stringInputTopic = testDriver.createInputTopic(STRING_TOPIC, new StringSerializer(), + new StringSerializer()); + avroInputTopic = testDriver.createInputTopic(AVRO_TOPIC, new StringSerializer(), + SerdesUtils.getSerdesForValue().serializer()); + + stringOutputTopic = + testDriver.createOutputTopic(OUTPUT_STRING_TOPIC, new StringDeserializer(), + new StringDeserializer()); + avroOutputTopic = testDriver.createOutputTopic(OUTPUT_AVRO_TOPIC, new StringDeserializer(), + SerdesUtils.getSerdesForValue().deserializer()); + + dlqTopic = testDriver.createOutputTopic(DLQ_TOPIC, new StringDeserializer(), + SerdesUtils.getSerdesForValue().deserializer()); } @Test @@ -86,7 +97,7 @@ void shouldContinueWhenProcessingValueIsValid() { } @Test - void shouldSendExceptionToDLQWhenProcessingValueIsInvalid() { + void shouldSendExceptionToDlqWhenProcessingValueIsInvalid() { stringInputTopic.pipeInput("key", "error"); var resultDlq = dlqTopic.readValuesToList(); @@ -100,13 +111,13 @@ void shouldSendExceptionToDLQWhenProcessingValueIsInvalid() { void shouldContinueWhenProcessingValueIsValidAvro() { KafkaError avroModel = KafkaError.newBuilder() - .setTopic("topic") - .setStack("stack") - .setPartition(0) - .setOffset(0) - .setCause("cause") - .setValue("value") - .build(); + .setTopic("topic") + .setStack("stack") + .setPartition(0) + .setOffset(0) + .setCause("cause") + .setValue("value") + .build(); avroInputTopic.pipeInput("key", avroModel); diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/HttpServerConstants.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/HttpServerConstants.java index ebd625df..9ee74b92 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/HttpServerConstants.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/HttpServerConstants.java @@ -4,37 +4,37 @@ import lombok.NoArgsConstructor; /** - * HTTP server constants + * HTTP server constants. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class HttpServerConstants { /** - * Readiness probe path property name + * Readiness probe path property name. */ public static final String READINESS_PROPERTY = "readiness_path"; /** - * Liveness probe path property name + * Liveness probe path property name. */ public static final String LIVENESS_PROPERTY = "liveness_path"; /** - * Topology property name + * Topology property name. */ public static final String TOPOLOGY_PROPERTY = "expose_topology_path"; /** - * Readiness default path + * Readiness default path. */ public static final String READINESS_DEFAULT_PATH = "ready"; /** - * Liveness default path + * Liveness default path. */ public static final String LIVENESS_DEFAULT_PATH = "liveness"; /** - * Topology default path + * Topology default path. */ public static final String TOPOLOGY_DEFAULT_PATH = "topology"; } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/InitializerConstants.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/InitializerConstants.java index 67ed8daf..1d278b7c 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/InitializerConstants.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/InitializerConstants.java @@ -4,27 +4,27 @@ import lombok.NoArgsConstructor; /** - * Kafka Streams initialization constants + * Kafka Streams initialization constants. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class InitializerConstants { /** - * Server port property name + * Server port property name. */ public static final String SERVER_PORT_PROPERTY = "server.port"; /** - * Default host + * Default host. */ public static final String LOCALHOST = "localhost"; /** - * Name of the property containing of the name of the var env containing the IP + * Name of the property containing of the name of the var env containing the IP. */ public static final String IP_SYSTEM_VARIABLE_PROPERTY = "ip.env.var.name"; /** - * Default var env name containing the IP + * Default var env name containing the IP. */ public static final String IP_SYSTEM_VARIABLE_DEFAULT = "MY_POD_IP"; } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/PropertyConstants.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/PropertyConstants.java index 1a07ac13..bb462dcd 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/PropertyConstants.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/PropertyConstants.java @@ -4,42 +4,42 @@ import lombok.NoArgsConstructor; /** - * Property constants + * Property constants. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class PropertyConstants { /** - * Property separator + * Property separator. */ public static final String PROPERTY_SEPARATOR = "."; /** - * Kafka properties prefix + * Kafka properties prefix. */ public static final String KAFKA_PROPERTIES_PREFIX = "kafka.properties"; /** - * Default property file name + * Default property file name. */ public static final String DEFAULT_PROPERTY_FILE = "application.yml"; /** - * Prefix property name + * Prefix property name. */ public static final String PREFIX_PROPERTY_NAME = "prefix"; /** - * Topic property name + * Topic property name. */ public static final String TOPIC_PROPERTY_NAME = "topic"; /** - * Remap property name + * Remap property name. */ public static final String REMAP_PROPERTY_NAME = "remap"; /** - * Default prefix property name + * Default prefix property name. */ public static final String SELF = "self"; } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java index 94ca389a..b9a5268c 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java @@ -1,39 +1,39 @@ package com.michelin.kstreamplify.context; -import com.michelin.kstreamplify.constants.PropertyConstants; +import static com.michelin.kstreamplify.constants.PropertyConstants.PREFIX_PROPERTY_NAME; +import static com.michelin.kstreamplify.constants.PropertyConstants.PROPERTY_SEPARATOR; +import static com.michelin.kstreamplify.constants.PropertyConstants.SELF; + +import java.util.Map; +import java.util.Properties; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.streams.StreamsConfig; -import java.util.Map; -import java.util.Properties; - -import static com.michelin.kstreamplify.constants.PropertyConstants.*; - /** - * The class to represent the context of the KStream + * The class to represent the context of the KStream. */ @Slf4j public class KafkaStreamsExecutionContext { /** - * the DLQ topic name + * The DLQ topic name. */ @Getter @Setter private static String dlqTopicName; /** - * the Serdes config Map + * The Serdes config Map. */ @Getter @Setter private static Map serdesConfig; /** - * the properties of the stream execution context + * The properties of the stream execution context. */ @Getter private static Properties properties; @@ -47,7 +47,6 @@ public class KafkaStreamsExecutionContext { * prefix: * self: "myNamespacePrefix." * } - * */ @Getter private static String prefix; @@ -56,7 +55,7 @@ private KafkaStreamsExecutionContext() { } /** - * Register KStream properties + * Register KStream properties. * * @param properties The Kafka Streams properties */ @@ -66,14 +65,17 @@ public static void registerProperties(Properties properties) { } prefix = properties.getProperty(PREFIX_PROPERTY_NAME + PROPERTY_SEPARATOR + SELF, ""); - if (StringUtils.isNotBlank(prefix) && properties.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) { + if (StringUtils.isNotBlank(prefix) + && properties.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) { properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, - prefix.concat(properties.getProperty(StreamsConfig.APPLICATION_ID_CONFIG))); + prefix.concat(properties.getProperty(StreamsConfig.APPLICATION_ID_CONFIG))); } KafkaStreamsExecutionContext.properties = properties; StringBuilder stringBuilderProperties = new StringBuilder("Kafka Stream properties:\n"); - properties.forEach((key, value) -> stringBuilderProperties.append("\t").append(key).append(" = ").append(value).append("\n")); + properties.forEach( + (key, value) -> stringBuilderProperties.append("\t").append(key).append(" = ") + .append(value).append("\n")); log.info(stringBuilderProperties.toString()); } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/AvroToJsonConverter.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/AvroToJsonConverter.java index e1ef992c..d74acf2a 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/AvroToJsonConverter.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/AvroToJsonConverter.java @@ -19,27 +19,28 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import org.apache.avro.Schema.Field; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.util.Utf8; - import java.time.Instant; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; /** - * The class to convert Avro to Json + * The class to convert Avro to Json. */ public class AvroToJsonConverter { - private AvroToJsonConverter() { } + private AvroToJsonConverter() { + } private static final Gson gson = new GsonBuilder() - .setPrettyPrinting() - .create(); + .setPrettyPrinting() + .create(); /** - * Convert the record from avro format to json format + * Convert the record from avro format to json format. + * * @param inputRecord the record in avro format * @return the record in json format */ @@ -48,7 +49,8 @@ public static String convertRecord(GenericRecord inputRecord) { } /** - * convert avro to a map for json format + * Convert avro to a map for json format. + * * @param inputRecord record in avro * @return map for json format */ @@ -64,15 +66,15 @@ private static Map recordAsMap(GenericRecord inputRecord) { if (recordValue instanceof List recordValueAsList) { recordValue = recordValueAsList - .stream() - .map(value -> { - if (value instanceof GenericRecord genericRecord) { - return recordAsMap(genericRecord); - } else { - return value.toString(); - } - }) - .toList(); + .stream() + .map(value -> { + if (value instanceof GenericRecord genericRecord) { + return recordAsMap(genericRecord); + } else { + return value.toString(); + } + }) + .toList(); } if (recordValue instanceof Map recordValueAsMap) { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/JsonToAvroConverter.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/JsonToAvroConverter.java index b3ba9455..d6fc5bb2 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/JsonToAvroConverter.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/JsonToAvroConverter.java @@ -4,11 +4,6 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.specific.SpecificRecordBase; - import java.math.BigDecimal; import java.math.MathContext; import java.math.RoundingMode; @@ -17,15 +12,20 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecordBase; /** - * The class to convert Json to Avro + * The class to convert Json to Avro. */ public class JsonToAvroConverter { /** - * convert a file in json to avro - * @param file the file in json + * Convert a file in json to avro. + * + * @param file the file in json * @param schema the avro schema to use * @return the record in avro */ @@ -34,123 +34,149 @@ public static SpecificRecordBase jsonToAvro(String file, Schema schema) { } /** - * convert json to avro + * Convert json to avro. + * * @param jsonEvent the json record - * @param schema the avro schema to use + * @param schema the avro schema to use * @return the record in avro */ public static SpecificRecordBase jsonToAvro(JsonObject jsonEvent, Schema schema) { try { - SpecificRecordBase record = baseClass(schema.getNamespace(), schema.getName()).getDeclaredConstructor().newInstance(); - populateGenericRecordFromJson(jsonEvent, record); - return record; + SpecificRecordBase message = + baseClass(schema.getNamespace(), schema.getName()).getDeclaredConstructor() + .newInstance(); + populateGenericRecordFromJson(jsonEvent, message); + return message; } catch (Exception e) { return null; } } /** - * populate avro records from json + * Populate avro records from json. + * * @param jsonObject json data to provide to the avro record - * @param record the avro record to populate + * @param message the avro record to populate */ - private static void populateGenericRecordFromJson(JsonObject jsonObject, SpecificRecordBase record) { - // Iterate over object attributes - jsonObject.keySet().forEach( - currentKey -> { - try { - var currentValue = jsonObject.get(currentKey); + private static void populateGenericRecordFromJson(JsonObject jsonObject, + SpecificRecordBase message) { + // Iterate over object attributes + jsonObject.keySet().forEach( + currentKey -> { + try { + var currentValue = jsonObject.get(currentKey); - // If this is an object, add to prefix and call method again - if (currentValue instanceof JsonObject currentValueJsonObject) { - Schema currentSchema = record.getSchema().getField(currentKey).schema(); + // If this is an object, add to prefix and call method again + if (currentValue instanceof JsonObject currentValueJsonObject) { + Schema currentSchema = message.getSchema().getField(currentKey).schema(); - // If the current value is a UNION - if (currentSchema.getType().equals(Schema.Type.UNION)) { - // Then research the first NOT NULL sub value - Optional notNullSchema = currentSchema.getTypes().stream() - .filter(s -> !s.getType().equals(Schema.Type.NULL)) - .findAny(); + // If the current value is a UNION + if (currentSchema.getType().equals(Schema.Type.UNION)) { + // Then research the first NOT NULL sub value + Optional notNullSchema = currentSchema.getTypes().stream() + .filter(s -> !s.getType().equals(Schema.Type.NULL)) + .findAny(); - if (notNullSchema.isPresent()) { - currentSchema = notNullSchema.get(); - } + if (notNullSchema.isPresent()) { + currentSchema = notNullSchema.get(); } + } - switch (currentSchema.getType()) { - case RECORD -> { - SpecificRecordBase currentRecord = baseClass(record.getSchema().getNamespace(), currentSchema.getName()).getDeclaredConstructor().newInstance(); - populateGenericRecordFromJson(currentValueJsonObject, currentRecord); - record.put(currentKey, currentRecord); - } - case MAP -> { - Map map = new HashMap<>(); - if (!currentSchema.getValueType().getType().equals(Schema.Type.RECORD)) { - for (String key : currentValueJsonObject.keySet()) { - Object value = populateFieldWithCorrespondingType(currentValueJsonObject.get(key), currentSchema.getValueType().getType()); - map.put(key, value); - } - } else { - for (String key : currentValueJsonObject.keySet()) { - SpecificRecordBase mapValueRecord = baseClass(record.getSchema().getNamespace(), currentSchema.getValueType().getName()).getDeclaredConstructor().newInstance(); - populateGenericRecordFromJson(currentValueJsonObject.get(key).getAsJsonObject(), mapValueRecord); - map.put(key, mapValueRecord); - } + switch (currentSchema.getType()) { + case RECORD -> { + SpecificRecordBase currentRecord = + baseClass(message.getSchema().getNamespace(), + currentSchema.getName()).getDeclaredConstructor() + .newInstance(); + populateGenericRecordFromJson(currentValueJsonObject, + currentRecord); + message.put(currentKey, currentRecord); + } + case MAP -> { + Map map = new HashMap<>(); + if (!currentSchema.getValueType().getType() + .equals(Schema.Type.RECORD)) { + for (String key : currentValueJsonObject.keySet()) { + Object value = populateFieldWithCorrespondingType( + currentValueJsonObject.get(key), + currentSchema.getValueType().getType()); + map.put(key, value); + } + } else { + for (String key : currentValueJsonObject.keySet()) { + SpecificRecordBase mapValueRecord = + baseClass(message.getSchema().getNamespace(), + currentSchema.getValueType() + .getName()).getDeclaredConstructor() + .newInstance(); + populateGenericRecordFromJson( + currentValueJsonObject.get(key).getAsJsonObject(), + mapValueRecord); + map.put(key, mapValueRecord); } - record.put(currentKey, map); } - default -> record.put(currentKey, - populateFieldWithCorrespondingType(currentValue, currentSchema.getType())); + message.put(currentKey, map); } + default -> message.put(currentKey, + populateFieldWithCorrespondingType(currentValue, + currentSchema.getType())); } - + } else if (currentValue instanceof JsonArray jsonArray) { // If this is an Array, call method for each one of them - else if (currentValue instanceof JsonArray jsonArray) { - var arraySchema = record.getSchema().getField(currentKey).schema(); - Schema arrayType = arraySchema.getType() != Schema.Type.UNION ? - arraySchema : - arraySchema.getTypes().stream() - .filter(s -> s.getType() != Schema.Type.NULL) - .findFirst().get(); - Schema elementType = arrayType.getElementType(); + var arraySchema = message.getSchema().getField(currentKey).schema(); + Schema arrayType = arraySchema.getType() != Schema.Type.UNION + ? arraySchema : + arraySchema.getTypes().stream() + .filter(s -> s.getType() != Schema.Type.NULL) + .findFirst().get(); + Schema elementType = arrayType.getElementType(); - if (elementType != null && Schema.Type.RECORD.equals(elementType.getType())) { - ArrayList recordArray = new ArrayList<>(); - for (int i = 0; i < jsonArray.size(); i++) { - SpecificRecordBase currentRecord = baseClass(record.getSchema().getNamespace(), elementType.getName()).getDeclaredConstructor().newInstance(); - populateGenericRecordFromJson((JsonObject) jsonArray.get(i), currentRecord); - recordArray.add(currentRecord); - } - record.put(currentKey, recordArray); - } else { - ArrayList objArray = new ArrayList<>(); - for (int i = 0; i < ((JsonArray) currentValue).size(); i++) { - Object obj = populateFieldWithCorrespondingType((((JsonArray) currentValue).get(i)), elementType.getType()); - objArray.add(obj); - } - record.put(currentKey, objArray); + if (elementType != null + && Schema.Type.RECORD.equals(elementType.getType())) { + ArrayList recordArray = new ArrayList<>(); + for (int i = 0; i < jsonArray.size(); i++) { + SpecificRecordBase currentRecord = + baseClass(message.getSchema().getNamespace(), + elementType.getName()).getDeclaredConstructor() + .newInstance(); + populateGenericRecordFromJson((JsonObject) jsonArray.get(i), + currentRecord); + recordArray.add(currentRecord); } - } - // Otherwise, put the value in the record after parsing according to its corresponding schema type - else { - if (!jsonObject.get(currentKey).isJsonNull()) { - populateFieldInRecordWithCorrespondingType(jsonObject, currentKey, record); + message.put(currentKey, recordArray); + } else { + ArrayList objArray = new ArrayList<>(); + for (int i = 0; i < ((JsonArray) currentValue).size(); i++) { + Object obj = populateFieldWithCorrespondingType( + (((JsonArray) currentValue).get(i)), elementType.getType()); + objArray.add(obj); } + message.put(currentKey, objArray); + } + } else { + // Otherwise, put the value in the record after parsing according to its + // corresponding schema type + if (!jsonObject.get(currentKey).isJsonNull()) { + populateFieldInRecordWithCorrespondingType(jsonObject, currentKey, + message); } - } catch (Exception e) { - throw new RuntimeException(e); } + } catch (Exception e) { + throw new RuntimeException(e); } - ); + } + ); } /** - * populate field with corresponding type + * Populate field with corresponding type. + * * @param jsonElement the json element to convert - * @param type the type of the element + * @param type the type of the element * @return the element converted with the corresponding type */ - private static Object populateFieldWithCorrespondingType(JsonElement jsonElement, Schema.Type type){ + private static Object populateFieldWithCorrespondingType(JsonElement jsonElement, + Schema.Type type) { return switch (type) { case INT -> jsonElement.getAsInt(); case LONG -> jsonElement.getAsLong(); @@ -162,26 +188,32 @@ private static Object populateFieldWithCorrespondingType(JsonElement jsonElement } /** - * populate field in record with corresponding type + * Populate field in record with corresponding type. + * * @param jsonObject data to provide to the avro record - * @param fieldName the name to populate - * @param result the avro record populated + * @param fieldName the name to populate + * @param result the avro record populated */ - private static void populateFieldInRecordWithCorrespondingType(JsonObject jsonObject, String fieldName, GenericRecord result) { + private static void populateFieldInRecordWithCorrespondingType(JsonObject jsonObject, + String fieldName, + GenericRecord result) { Schema fieldSchema = result.getSchema().getField(fieldName).schema(); - Optional optionalFieldType = fieldSchema.getType() != Schema.Type.UNION ? Optional.of(fieldSchema) : + Optional optionalFieldType = + fieldSchema.getType() != Schema.Type.UNION ? Optional.of(fieldSchema) : fieldSchema.getTypes() - .stream() - .filter(s -> s.getType() != Schema.Type.NULL) - .findFirst(); + .stream() + .filter(s -> s.getType() != Schema.Type.NULL) + .findFirst(); if (optionalFieldType.isPresent()) { Schema fieldType = optionalFieldType.get(); switch (fieldType.getType()) { case INT -> result.put(fieldName, jsonObject.get(fieldName).getAsInt()); case LONG -> { - if (fieldType.getLogicalType() != null && fieldType.getLogicalType().getName().equals("timestamp-millis")) { - result.put(fieldName, Instant.ofEpochSecond(jsonObject.get(fieldName).getAsLong())); + if (fieldType.getLogicalType() != null + && fieldType.getLogicalType().getName().equals("timestamp-millis")) { + result.put(fieldName, + Instant.ofEpochSecond(jsonObject.get(fieldName).getAsLong())); } else { result.put(fieldName, jsonObject.get(fieldName).getAsLong()); } @@ -190,12 +222,16 @@ private static void populateFieldInRecordWithCorrespondingType(JsonObject jsonOb case DOUBLE -> result.put(fieldName, jsonObject.get(fieldName).getAsDouble()); case BOOLEAN -> result.put(fieldName, jsonObject.get(fieldName).getAsBoolean()); case BYTES -> { - if (fieldType.getLogicalType() != null && fieldType.getLogicalType().getName().equals("decimal")) { + if (fieldType.getLogicalType() != null + && fieldType.getLogicalType().getName().equals("decimal")) { result.put( - fieldName, - new BigDecimal(jsonObject.get(fieldName).getAsString()) - .setScale(((LogicalTypes.Decimal) fieldType.getLogicalType()).getScale(), RoundingMode.HALF_UP) - .round(new MathContext(((LogicalTypes.Decimal) fieldType.getLogicalType()).getPrecision())) + fieldName, + new BigDecimal(jsonObject.get(fieldName).getAsString()) + .setScale( + ((LogicalTypes.Decimal) fieldType.getLogicalType()).getScale(), + RoundingMode.HALF_UP) + .round(new MathContext( + ((LogicalTypes.Decimal) fieldType.getLogicalType()).getPrecision())) ); } else { // This is not supposed to happen, that would mean that the given field is in Byte format @@ -208,9 +244,10 @@ private static void populateFieldInRecordWithCorrespondingType(JsonObject jsonOb } /** - * get base class + * Get base class. + * * @param baseNamespace the namespace of the class - * @param typeName the class type + * @param typeName the class type * @return the base class */ @SuppressWarnings("unchecked") diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java index 0bb4d140..a3494088 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java @@ -1,6 +1,7 @@ package com.michelin.kstreamplify.deduplication; import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.api.Processor; @@ -9,36 +10,36 @@ import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; -import java.time.Duration; - /** - * Transformer class for the deduplication mechanism on keys of a given topic + * Transformer class for the deduplication mechanism on keys of a given topic. * * @param The type of the value */ -public class DedupKeyProcessor implements Processor> { +public class DedupKeyProcessor + implements Processor> { /** - * Kstream context for this transformer + * Kstream context for this transformer. */ private ProcessorContext> processorContext; + /** - * Window store containing all the records seen on the given window + * Window store containing all the records seen on the given window. */ private TimestampedKeyValueStore dedupTimestampedStore; /** - * Window store name, initialized @ construction + * Window store name, initialized @ construction. */ private final String dedupStoreName; /** - * Retention window for the statestore. Used for fetching data + * Retention window for the statestore. Used for fetching data. */ private final Duration retentionWindowDuration; /** - * Constructor + * Constructor. * * @param dedupStoreName The name of the constructor * @param retentionWindowDuration The retentionWindow Duration @@ -54,32 +55,37 @@ public void init(ProcessorContext> context) { dedupTimestampedStore = this.processorContext.getStateStore(dedupStoreName); - processorContext.schedule(Duration.ofHours(1), PunctuationType.WALL_CLOCK_TIME, currentTimestamp -> { - try (var iterator = dedupTimestampedStore.all()) { - while (iterator.hasNext()) { - var currentRecord = iterator.next(); - if (currentRecord.value.timestamp() + retentionWindowDuration.toMillis() < currentTimestamp) { - dedupTimestampedStore.delete(currentRecord.key); + processorContext.schedule(Duration.ofHours(1), PunctuationType.WALL_CLOCK_TIME, + currentTimestamp -> { + try (var iterator = dedupTimestampedStore.all()) { + while (iterator.hasNext()) { + var currentRecord = iterator.next(); + if (currentRecord.value.timestamp() + retentionWindowDuration.toMillis() + < currentTimestamp) { + dedupTimestampedStore.delete(currentRecord.key); + } } } - } - }); + }); } @Override - public void process(Record record) { - String key = record.key(); + public void process(Record message) { + String key = message.key(); try { // Retrieve the matching key in the statestore and return null if found (signaling a duplicate) if (dedupTimestampedStore.get(key) == null) { - // First time we see this record, store entry in the windowstore and forward the record to the output - dedupTimestampedStore.put(key, ValueAndTimestamp.make(key, processorContext.currentStreamTimeMs())); + // First time we see this record, store entry in the window store and forward the record to the output + dedupTimestampedStore.put(key, + ValueAndTimestamp.make(key, processorContext.currentStreamTimeMs())); - processorContext.forward(ProcessingResult.wrapRecordSuccess(record)); + processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); } } catch (Exception e) { - processorContext.forward(ProcessingResult.wrapRecordFailure(e, record, "Couldn't figure out what to do with the current payload: An unlikely error occurred during deduplication transform")); + processorContext.forward(ProcessingResult.wrapRecordFailure(e, message, + "Couldn't figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform")); } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java index 4520d1b2..cc8a48da 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java @@ -2,42 +2,44 @@ import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import java.time.Instant; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.WindowStore; -import java.time.Duration; -import java.time.Instant; - /** - * Transformer class for the deduplication mechanism on both keys and values of a given topic + * Transformer class for the deduplication mechanism on both keys and values of a given topic. * * @param The type of the value */ -public class DedupKeyValueProcessor implements Processor> { +public class DedupKeyValueProcessor + implements Processor> { /** - * Kstream context for this transformer + * Kstream context for this transformer. */ private ProcessorContext> processorContext; + /** - * Window store containing all the records seen on the given window + * Window store containing all the records seen on the given window. */ private WindowStore dedupWindowStore; /** - * Window store name, initialized @ construction + * Window store name, initialized @ construction. */ private final String windowStoreName; + /** - * Retention window for the statestore. Used for fetching data + * Retention window for the statestore. Used for fetching data. */ private final Duration retentionWindowDuration; /** - * Constructor method + * Constructor. * * @param windowStoreName The window store name * @param retentionWindowHours The retention window duration @@ -55,26 +57,30 @@ public void init(ProcessorContext> context) { } @Override - public void process(Record record) { + public void process(Record message) { try { // Get the record timestamp - var currentInstant = Instant.ofEpochMilli(record.timestamp()); + var currentInstant = Instant.ofEpochMilli(message.timestamp()); // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) - try (var resultIterator = dedupWindowStore.backwardFetch(record.key(), currentInstant.minus(retentionWindowDuration), currentInstant.plus(retentionWindowDuration))) { + try (var resultIterator = dedupWindowStore.backwardFetch(message.key(), + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { while (resultIterator != null && resultIterator.hasNext()) { var currentKeyValue = resultIterator.next(); - if (record.value().equals(currentKeyValue.value)) { + if (message.value().equals(currentKeyValue.value)) { return; } } } // First time we see this record, store entry in the window store and forward the record to the output - dedupWindowStore.put(record.key(), record.value(), record.timestamp()); - processorContext.forward(ProcessingResult.wrapRecordSuccess(record)); + dedupWindowStore.put(message.key(), message.value(), message.timestamp()); + processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); } catch (Exception e) { - processorContext.forward(ProcessingResult.wrapRecordFailure(e, record, "Couldn't figure out what to do with the current payload: An unlikely error occured during deduplication transform")); + processorContext.forward(ProcessingResult.wrapRecordFailure(e, message, + "Couldn't figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform")); } } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java index 0a0a8998..3df508f8 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java @@ -2,6 +2,8 @@ import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import java.util.function.Function; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.api.Processor; @@ -10,49 +12,49 @@ import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; -import java.time.Duration; -import java.util.function.Function; - /** - * Transformer class for the deduplication mechanism on keys of a given topic + * Transformer class for the deduplication mechanism on keys of a given topic. * * @param The type of the key * @param The type of the value */ -public class DedupWithPredicateProcessor implements Processor> { +public class DedupWithPredicateProcessor + implements Processor> { /** - * Kstream context for this transformer + * Kstream context for this transformer. */ private ProcessorContext> processorContext; + /** - * Window store containing all the records seen on the given window + * Window store containing all the records seen on the given window. */ private TimestampedKeyValueStore dedupTimestampedStore; /** - * Window store name, initialized @ construction + * Window store name, initialized @ construction. */ private final String dedupStoreName; /** - * Retention window for the statestore. Used for fetching data + * Retention window for the statestore. Used for fetching data. */ private final Duration retentionWindowDuration; /** - * + * Deduplication key extractor. */ private final Function deduplicationKeyExtractor; /** - * Constructor method + * Constructor. * * @param dedupStoreName Name of the deduplication state store * @param retentionWindowDuration Retention window duration * @param deduplicationKeyExtractor Deduplication function */ - public DedupWithPredicateProcessor(String dedupStoreName, Duration retentionWindowDuration, Function deduplicationKeyExtractor) { + public DedupWithPredicateProcessor(String dedupStoreName, Duration retentionWindowDuration, + Function deduplicationKeyExtractor) { this.dedupStoreName = dedupStoreName; this.retentionWindowDuration = retentionWindowDuration; this.deduplicationKeyExtractor = deduplicationKeyExtractor; @@ -64,31 +66,36 @@ public void init(ProcessorContext> context) { dedupTimestampedStore = this.processorContext.getStateStore(dedupStoreName); - processorContext.schedule(Duration.ofHours(1), PunctuationType.WALL_CLOCK_TIME, (currentTimestamp) -> { - try (var iterator = dedupTimestampedStore.all()) { - while (iterator.hasNext()) { - var currentRecord = iterator.next(); - if (currentRecord.value.timestamp() + retentionWindowDuration.toMillis() < currentTimestamp) { - dedupTimestampedStore.delete(currentRecord.key); + processorContext.schedule(Duration.ofHours(1), PunctuationType.WALL_CLOCK_TIME, + currentTimestamp -> { + try (var iterator = dedupTimestampedStore.all()) { + while (iterator.hasNext()) { + var currentRecord = iterator.next(); + if (currentRecord.value.timestamp() + retentionWindowDuration.toMillis() + < currentTimestamp) { + dedupTimestampedStore.delete(currentRecord.key); + } } } - } - }); + }); } @Override - public void process(Record record) { + public void process(Record message) { try { - String identifier = deduplicationKeyExtractor.apply(record.value()); + String identifier = deduplicationKeyExtractor.apply(message.value()); // Retrieve the matching identifier in the statestore and return null if found it (signaling a duplicate) if (dedupTimestampedStore.get(identifier) == null) { - // First time we see this record, store entry in the windowstore and forward the record to the output - dedupTimestampedStore.put(identifier, ValueAndTimestamp.make(record.value(), record.timestamp())); - processorContext.forward(ProcessingResult.wrapRecordSuccess(record)); + // First time we see this record, store entry in the window store and forward the record to the output + dedupTimestampedStore.put(identifier, + ValueAndTimestamp.make(message.value(), message.timestamp())); + processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); } } catch (Exception e) { - processorContext.forward(ProcessingResult.wrapRecordFailure(e, record, "Couldn't figure out what to do with the current payload: An unlikely error occured during deduplication transform")); + processorContext.forward(ProcessingResult.wrapRecordFailure(e, message, + "Couldn't figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform")); } } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java index 14b3bfaa..3924c491 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java @@ -2,6 +2,8 @@ import com.michelin.kstreamplify.error.ProcessingResult; import com.michelin.kstreamplify.utils.SerdesUtils; +import java.time.Duration; +import java.util.function.Function; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -12,9 +14,6 @@ import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.WindowStore; -import java.time.Duration; -import java.util.function.Function; - /** * Deduplication utility class. Only streams with String keys are supported. */ @@ -38,34 +37,50 @@ private DeduplicationUtils() { * @param streamsBuilder Stream builder instance for topology editing * @param initialStream Stream containing the events that should be deduplicated * @param windowDuration Window of time on which we should watch out for duplicates - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a String as the key is retarded. You can quote me on this. + * @param Generic Type of the Stream value. + * Key type is not implemented because using anything other than + * a String as the key is retarded. + * You can quote me on this. * @return KStream with a processingResult */ - public static KStream> deduplicateKeys(StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration) { + public static KStream> deduplicateKeys( + StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration) { - return deduplicateKeys(streamsBuilder, initialStream, DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, windowDuration); + return deduplicateKeys(streamsBuilder, initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration); } /** - * Deduplicate the input stream on the input key using a window store for the given period of time + * Deduplicate the input stream on the input key using a window store for the given period of time. * * @param streamsBuilder Stream builder instance for topology editing * @param initialStream Stream containing the events that should be deduplicated * @param storeName Statestore name * @param repartitionName Repartition topic name * @param windowDuration Window of time to keep in the window store - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a String as the key is retarded. You can quote me on this. + * @param Generic Type of the Stream value. + * Key type is not implemented because using anything other than + * a String as the key is retarded. + * You can quote me on this. * @return Resulting de-duplicated Stream */ - public static KStream> deduplicateKeys(StreamsBuilder streamsBuilder, KStream initialStream, String storeName, String repartitionName, Duration windowDuration) { - - StoreBuilder> dedupStore = Stores.timestampedKeyValueStoreBuilder( - Stores.persistentTimestampedKeyValueStore(storeName), Serdes.String(), Serdes.String()); + public static KStream> deduplicateKeys( + StreamsBuilder streamsBuilder, KStream initialStream, String storeName, + String repartitionName, Duration windowDuration) { + + StoreBuilder> dedupStore = + Stores.timestampedKeyValueStoreBuilder( + Stores.persistentTimestampedKeyValueStore(storeName), Serdes.String(), + Serdes.String()); streamsBuilder.addStateStore(dedupStore); - var repartitioned = initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getSerdesForValue()).withName(repartitionName)); - return repartitioned.process(() -> new DedupKeyProcessor<>(storeName, windowDuration), storeName); + var repartitioned = initialStream.repartition( + Repartitioned.with(Serdes.String(), SerdesUtils.getSerdesForValue()) + .withName(repartitionName)); + return repartitioned.process(() -> new DedupKeyProcessor<>(storeName, windowDuration), + storeName); } /** @@ -75,12 +90,18 @@ public static KStream> * @param streamsBuilder Stream builder instance for topology editing * @param initialStream Stream containing the events that should be deduplicated * @param windowDuration Window of time on which we should watch out for duplicates - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a String as the key is retarded. You can quote me on this. + * @param Generic Type of the Stream value. + * Key type is not implemented because using anything other than + * a String as the key is retarded. + * You can quote me on this. * @return KStream with a processingResult */ - public static KStream> deduplicateKeyValues(StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration) { + public static KStream> deduplicateKeyValues( + StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration) { - return deduplicateKeyValues(streamsBuilder, initialStream, DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, windowDuration); + return deduplicateKeyValues(streamsBuilder, initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration); } /** @@ -92,38 +113,63 @@ public static KStream> * @param storeName Statestore name * @param repartitionName Repartition topic name * @param windowDuration Window of time to keep in the window store - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a String as the key is retarded. You can quote me on this. + * @param Generic Type of the Stream value. + * Key type is not implemented because using anything other + * than a String as the key is retarded. + * You can quote me on this. * @return Resulting de-duplicated Stream */ - public static KStream> deduplicateKeyValues(StreamsBuilder streamsBuilder, KStream initialStream, String storeName, String repartitionName, Duration windowDuration) { + public static KStream> deduplicateKeyValues( + StreamsBuilder streamsBuilder, KStream initialStream, String storeName, + String repartitionName, Duration windowDuration) { StoreBuilder> dedupWindowStore = Stores.windowStoreBuilder( - Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false), Serdes.String(), SerdesUtils.getSerdesForValue()); + Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false), + Serdes.String(), SerdesUtils.getSerdesForValue()); streamsBuilder.addStateStore(dedupWindowStore); - var repartitioned = initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getSerdesForValue()).withName(repartitionName)); - return repartitioned.process(() -> new DedupKeyValueProcessor<>(storeName, windowDuration), storeName); + var repartitioned = initialStream.repartition( + Repartitioned.with(Serdes.String(), SerdesUtils.getSerdesForValue()) + .withName(repartitionName)); + return repartitioned.process(() -> new DedupKeyValueProcessor<>(storeName, windowDuration), + storeName); } /** - *

Deduplicate the input stream by applying the deduplicationKeyExtractor function on each record to generate a unique signature for the record.

+ *

Deduplicate the input stream by applying the deduplicationKeyExtractor function on each record + * to generate a unique signature for the record.

*

Uses a window store for the given period of time.

*

The input stream should have a String key.

- *

⚠ This constructor should not be used if using the deduplicator multiple times in the same topology. Use {@link DeduplicationUtils#deduplicateWithPredicate(StreamsBuilder, KStream, String storeName, String repartitionName, Duration, Function)} in this scenario.

+ *

⚠ This constructor should not be used if using the deduplicator multiple times in the same topology. + * Use {@link + * DeduplicationUtils#deduplicateWithPredicate(StreamsBuilder, KStream, String storeName, + * String repartitionName, Duration, Function)} + * in this scenario.

* * @param streamsBuilder Stream builder instance for topology editing * @param initialStream Stream containing the events that should be deduplicated * @param windowDuration Window of time to keep in the window store - * @param deduplicationKeyExtractor Function that should extract a deduplication key in String format. This key acts like a comparison vector. A recommended approach is to concatenate all necessary fields in String format to provide a unique identifier for comparison between records. - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a String as the key is retarded. You can quote me on this. + * @param deduplicationKeyExtractor Function that should extract a deduplication key in String format. + * This key acts like a comparison vector. + * A recommended approach is to concatenate all necessary fields in String format + * to provide a unique identifier for comparison between records. + * @param Generic Type of the Stream value. + * Key type is not implemented because using anything other + * than a String as the key is retarded. + * You can quote me on this. * @return Resulting de-duplicated Stream */ - public static KStream> deduplicateWithPredicate(StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration, Function deduplicationKeyExtractor) { - return deduplicateWithPredicate(streamsBuilder, initialStream, DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, windowDuration, deduplicationKeyExtractor); + public static KStream> deduplicateWithPredicate( + StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration, + Function deduplicationKeyExtractor) { + return deduplicateWithPredicate(streamsBuilder, initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration, deduplicationKeyExtractor); } /** - *

Deduplicate the input stream by applying the deduplicationKeyExtractor function on each record to generate a unique signature for the record.

+ *

Deduplicate the input stream by applying the deduplicationKeyExtractor function + * on each record to generate a unique signature for the record.

*

Uses a window store for the given period of time.

*

The input stream should have a String key.

* @@ -132,16 +178,31 @@ public static KStream> * @param storeName Statestore name * @param repartitionName Repartition topic name * @param windowDuration Window of time to keep in the window store - * @param deduplicationKeyExtractor Function that should extract a deduplication key in String format. This key acts like a comparison vector. A recommended approach is to concatenate all necessary fields in String format to provide a unique identifier for comparison between records. - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a String as the key is retarded. You can quote me on this. + * @param deduplicationKeyExtractor Function that should extract a deduplication key in String format. + * This key acts like a comparison vector. + * A recommended approach is to concatenate all necessary fields + * in String format to provide a unique identifier for comparison between records. + * @param Generic Type of the Stream value. + * Key type is not implemented because using anything other than + * a String as the key is retarded. + * You can quote me on this. * @return Resulting de-duplicated Stream */ - public static KStream> deduplicateWithPredicate(StreamsBuilder streamsBuilder, KStream initialStream, String storeName, String repartitionName, Duration windowDuration, Function deduplicationKeyExtractor) { - StoreBuilder> dedupStore = Stores.timestampedKeyValueStoreBuilder( - Stores.persistentTimestampedKeyValueStore(storeName), Serdes.String(), SerdesUtils.getSerdesForValue()); + public static KStream> deduplicateWithPredicate( + StreamsBuilder streamsBuilder, KStream initialStream, String storeName, + String repartitionName, Duration windowDuration, + Function deduplicationKeyExtractor) { + StoreBuilder> dedupStore = + Stores.timestampedKeyValueStoreBuilder( + Stores.persistentTimestampedKeyValueStore(storeName), Serdes.String(), + SerdesUtils.getSerdesForValue()); streamsBuilder.addStateStore(dedupStore); - var repartitioned = initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getSerdesForValue()).withName(repartitionName)); - return repartitioned.process(() -> new DedupWithPredicateProcessor<>(storeName, windowDuration, deduplicationKeyExtractor), storeName); + var repartitioned = initialStream.repartition( + Repartitioned.with(Serdes.String(), SerdesUtils.getSerdesForValue()) + .withName(repartitionName)); + return repartitioned.process( + () -> new DedupWithPredicateProcessor<>(storeName, windowDuration, + deduplicationKeyExtractor), storeName); } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java index 3c1cc745..75abc4e8 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java @@ -2,25 +2,25 @@ import com.michelin.kstreamplify.avro.KafkaError; import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.common.KafkaException; - -import java.util.Map; /** - * The class managing deserialization exceptions + * The class managing deserialization exceptions. */ @Slf4j -public class DlqDeserializationExceptionHandler extends DlqExceptionHandler implements DeserializationExceptionHandler { +public class DlqDeserializationExceptionHandler extends DlqExceptionHandler + implements DeserializationExceptionHandler { private static final Object GUARD = new Object(); /** - * Manage deserialization exceptions + * Manage deserialization exceptions. * * @param processorContext the processor context * @param consumerRecord the record to deserialize @@ -28,34 +28,50 @@ public class DlqDeserializationExceptionHandler extends DlqExceptionHandler impl * @return FAIL or CONTINUE */ @Override - public DeserializationHandlerResponse handle(ProcessorContext processorContext, ConsumerRecord consumerRecord, Exception consumptionException) { + public DeserializationHandlerResponse handle(ProcessorContext processorContext, + ConsumerRecord consumerRecord, + Exception consumptionException) { if (StringUtils.isBlank(KafkaStreamsExecutionContext.getDlqTopicName())) { - log.warn("Failed to route deserialization error to the designated DLQ (Dead Letter Queue) topic. Please make sure to define a DLQ topic in your KafkaStreamsStarter bean configuration."); + log.warn( + "Failed to route deserialization error to the designated DLQ (Dead Letter Queue) topic. " + + + "Please make sure to define a DLQ topic in your KafkaStreamsStarter bean configuration."); return DeserializationHandlerResponse.FAIL; } try { var builder = KafkaError.newBuilder(); - enrichWithException(builder, consumptionException, consumerRecord.key(), consumerRecord.value()) - .setContextMessage("An exception occurred during the stream internal deserialization") - .setOffset(consumerRecord.offset()) - .setPartition(consumerRecord.partition()) - .setTopic(consumerRecord.topic()); + enrichWithException(builder, consumptionException, consumerRecord.key(), + consumerRecord.value()) + .setContextMessage( + "An exception occurred during the stream internal deserialization") + .setOffset(consumerRecord.offset()) + .setPartition(consumerRecord.partition()) + .setTopic(consumerRecord.topic()); boolean isCausedByKafka = consumptionException.getCause() instanceof KafkaException; - //If the cause of this exception is a KafkaException and if getCause == sourceException (see Throwable.getCause - including SerializationException) - //use to handle poison pill => sent message into dlq and continue our life. - if(isCausedByKafka || consumptionException.getCause() == null) { - producer.send(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), consumerRecord.key(), builder.build())).get(); + // If the cause of this exception is a KafkaException and if getCause == sourceException + // (see Throwable.getCause - including SerializationException) + // use to handle poison pill => sent message into dlq and continue our life. + if (isCausedByKafka || consumptionException.getCause() == null) { + producer.send(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), + consumerRecord.key(), builder.build())).get(); return DeserializationHandlerResponse.CONTINUE; } } catch (InterruptedException ie) { - log.error("Interruption while sending the deserialization exception {} for key {}, value {} and topic {} to DLQ topic {}", consumptionException, - consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), KafkaStreamsExecutionContext.getDlqTopicName(), ie); + log.error( + "Interruption while sending the deserialization exception {} for key {}, " + + "value {} and topic {} to DLQ topic {}", + consumptionException, + consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), + KafkaStreamsExecutionContext.getDlqTopicName(), ie); Thread.currentThread().interrupt(); } catch (Exception e) { - log.error("Cannot send the deserialization exception {} for key {}, value {} and topic {} to DLQ topic {}", consumptionException, - consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), KafkaStreamsExecutionContext.getDlqTopicName(), e); + log.error( + "Cannot send the deserialization exception {} for key {}, value {} and topic {} to DLQ topic {}", + consumptionException, + consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), + KafkaStreamsExecutionContext.getDlqTopicName(), e); } // here we only have exception like UnknownHostException for example or TimeoutException ... diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqExceptionHandler.java index 7d635d25..84e723b8 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqExceptionHandler.java @@ -2,51 +2,56 @@ import com.michelin.kstreamplify.avro.KafkaError; import io.confluent.kafka.serializers.KafkaAvroSerializer; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.errors.RecordTooLargeException; -import org.apache.kafka.common.serialization.ByteArraySerializer; - import java.io.PrintWriter; import java.io.StringWriter; import java.nio.ByteBuffer; import java.util.Map; import java.util.Properties; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.serialization.ByteArraySerializer; /** - * The class to manage DLQ exception + * The class to manage DLQ exception. */ @Slf4j public abstract class DlqExceptionHandler { /** - * The DLQ producer + * The DLQ producer. */ protected static KafkaProducer producer; /** - * Create a producer + * Create a producer. + * * @param clientId The producer client id - * @param configs The producer configs + * @param configs The producer configs */ protected static void instantiateProducer(String clientId, Map configs) { Properties properties = new Properties(); properties.putAll(configs); - properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + ByteArraySerializer.class.getName()); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + KafkaAvroSerializer.class.getName()); properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, clientId); producer = new KafkaProducer<>(properties); } /** - * enrich with exception - * @param builder the error builder + * Enrich with exception. + * + * @param builder the error builder * @param exception the exception to add - * @param key the record key - * @param value the record value + * @param key the record key + * @param value the record value * @return the error enriched by the exception */ - protected KafkaError.Builder enrichWithException(KafkaError.Builder builder, Exception exception, byte[] key, byte[] value) { + protected KafkaError.Builder enrichWithException(KafkaError.Builder builder, + Exception exception, byte[] key, + byte[] value) { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); exception.printStackTrace(pw); @@ -54,9 +59,11 @@ protected KafkaError.Builder enrichWithException(KafkaError.Builder builder, Exc boolean tooLarge = exception instanceof RecordTooLargeException; return builder - .setCause(exception.getCause() != null ? exception.getCause().getMessage() : "Unknown cause") - .setValue(tooLarge ? "The record is too large to be set as value (" + value.length + " bytes). The key will be used instead" : null) - .setStack(sw.toString()) - .setByteValue(tooLarge ? ByteBuffer.wrap(key) : ByteBuffer.wrap(value)); + .setCause( + exception.getCause() != null ? exception.getCause().getMessage() : "Unknown cause") + .setValue(tooLarge ? "The record is too large to be set as value (" + value.length + + " bytes). The key will be used instead" : null) + .setStack(sw.toString()) + .setByteValue(tooLarge ? ByteBuffer.wrap(key) : ByteBuffer.wrap(value)); } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java index ad685047..7e376adc 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java @@ -2,31 +2,36 @@ import com.michelin.kstreamplify.avro.KafkaError; import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.streams.errors.ProductionExceptionHandler; -import java.util.Map; - /** - * The class managing DLQ production exceptions + * The class managing DLQ production exceptions. */ @Slf4j -public class DlqProductionExceptionHandler extends DlqExceptionHandler implements ProductionExceptionHandler { +public class DlqProductionExceptionHandler extends DlqExceptionHandler + implements ProductionExceptionHandler { private static final Object GUARD = new Object(); /** - * Manage production exceptions - * @param producerRecord the record to produce + * Manage production exceptions. + * + * @param producerRecord the record to produce * @param productionException the exception on producing * @return FAIL or CONTINUE */ @Override - public ProductionExceptionHandlerResponse handle(ProducerRecord producerRecord, Exception productionException) { + public ProductionExceptionHandlerResponse handle(ProducerRecord producerRecord, + Exception productionException) { if (StringUtils.isBlank(KafkaStreamsExecutionContext.getDlqTopicName())) { - log.warn("Failed to route production error to the designated DLQ (Dead Letter Queue) topic. Please make sure to define a DLQ topic in your KafkaStreamsStarter bean configuration."); + log.warn( + "Failed to route production error to the designated DLQ (Dead Letter Queue) topic. " + + + "Please make sure to define a DLQ topic in your KafkaStreamsStarter bean configuration."); return ProductionExceptionHandlerResponse.FAIL; } @@ -35,20 +40,31 @@ public ProductionExceptionHandlerResponse handle(ProducerRecord if (!retryable) { try { var builder = KafkaError.newBuilder(); - enrichWithException(builder, productionException, producerRecord.key(), producerRecord.value()) - .setContextMessage("An exception occurred during the stream internal production") - .setOffset(-1) - .setPartition(producerRecord.partition() == null ? -1 : producerRecord.partition()) - .setTopic(producerRecord.topic()); + enrichWithException(builder, productionException, producerRecord.key(), + producerRecord.value()) + .setContextMessage( + "An exception occurred during the stream internal production") + .setOffset(-1) + .setPartition( + producerRecord.partition() == null ? -1 : producerRecord.partition()) + .setTopic(producerRecord.topic()); - producer.send(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), producerRecord.key(), builder.build())).get(); + producer.send(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), + producerRecord.key(), builder.build())).get(); } catch (InterruptedException ie) { - log.error("Interruption while sending the production exception {} for key {}, value {} and topic {} to DLQ topic {}", productionException, - producerRecord.key(), producerRecord.value(), producerRecord.topic(), KafkaStreamsExecutionContext.getDlqTopicName(), ie); + log.error( + "Interruption while sending the production exception {} for key {}, value {} " + + "and topic {} to DLQ topic {}", + productionException, + producerRecord.key(), producerRecord.value(), producerRecord.topic(), + KafkaStreamsExecutionContext.getDlqTopicName(), ie); Thread.currentThread().interrupt(); } catch (Exception e) { - log.error("Cannot send the production exception {} for key {}, value {} and topic {} to DLQ topic {}", productionException, - producerRecord.key(), producerRecord.value(), producerRecord.topic(), KafkaStreamsExecutionContext.getDlqTopicName(), e); + log.error( + "Cannot send the production exception {} for key {}, value {} and topic {} to DLQ topic {}", + productionException, + producerRecord.key(), producerRecord.value(), producerRecord.topic(), + KafkaStreamsExecutionContext.getDlqTopicName(), e); return ProductionExceptionHandlerResponse.CONTINUE; } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/GenericErrorProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/GenericErrorProcessor.java index 70aeca41..73e60f5f 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/GenericErrorProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/GenericErrorProcessor.java @@ -1,23 +1,25 @@ package com.michelin.kstreamplify.error; import com.michelin.kstreamplify.avro.KafkaError; +import java.io.PrintWriter; +import java.io.StringWriter; import org.apache.kafka.streams.processor.api.FixedKeyProcessor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.processor.api.RecordMetadata; -import java.io.PrintWriter; -import java.io.StringWriter; - /** - * Generic error processor + * Generic error processor. + * * @param The type of the failed record */ -public class GenericErrorProcessor implements FixedKeyProcessor, KafkaError> { +public class GenericErrorProcessor + implements FixedKeyProcessor, KafkaError> { private FixedKeyProcessorContext context; /** - * init context + * Init context. + * * @param context the context to init */ @Override @@ -26,7 +28,8 @@ public void init(FixedKeyProcessorContext context) { } /** - * process the error + * Process the error. + * * @param fixedKeyRecord the record to process an error */ @Override @@ -38,14 +41,16 @@ public void process(FixedKeyRecord> fixedKeyRecord) { RecordMetadata recordMetadata = context.recordMetadata().orElse(null); KafkaError error = KafkaError.newBuilder() - .setCause(fixedKeyRecord.value().getException().getMessage()) - .setContextMessage(fixedKeyRecord.value().getContextMessage()) - .setOffset(recordMetadata != null ? recordMetadata.offset() : -1) - .setPartition(recordMetadata != null ? recordMetadata.partition() : -1) - .setStack(sw.toString()) - .setTopic(recordMetadata != null && recordMetadata.topic() != null ? recordMetadata.topic() : "Outside topic context") - .setValue(fixedKeyRecord.value().getKafkaRecord()) - .build(); + .setCause(fixedKeyRecord.value().getException().getMessage()) + .setContextMessage(fixedKeyRecord.value().getContextMessage()) + .setOffset(recordMetadata != null ? recordMetadata.offset() : -1) + .setPartition(recordMetadata != null ? recordMetadata.partition() : -1) + .setStack(sw.toString()) + .setTopic( + recordMetadata != null && recordMetadata.topic() != null ? recordMetadata.topic() : + "Outside topic context") + .setValue(fixedKeyRecord.value().getKafkaRecord()) + .build(); context.forward(fixedKeyRecord.withValue(error)); } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/ProcessingError.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/ProcessingError.java index c4992ecd..0547388c 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/ProcessingError.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/ProcessingError.java @@ -5,31 +5,33 @@ import org.apache.avro.generic.GenericRecord; /** - * The processing error class + * The processing error class. + * * @param The type of the failed record */ @Getter public class ProcessingError { /** - * The exception that occurred + * The exception that occurred. */ private final Exception exception; /** - * The failed Kafka record + * The failed Kafka record. */ private final String kafkaRecord; /** - * A context message defined when the error is caught + * A context message defined when the error is caught. */ private final String contextMessage; /** - * Constructor - * @param exception The exception + * Constructor. + * + * @param exception The exception * @param contextMessage The context message - * @param kafkaRecord The failed Kafka record + * @param kafkaRecord The failed Kafka record */ public ProcessingError(Exception exception, String contextMessage, V kafkaRecord) { this.exception = exception; @@ -43,8 +45,9 @@ public ProcessingError(Exception exception, String contextMessage, V kafkaRecord } /** - * Constructor - * @param exception The exception + * Constructor. + * + * @param exception The exception * @param kafkaRecord The failed Kafka record */ public ProcessingError(Exception exception, V kafkaRecord) { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/ProcessingResult.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/ProcessingResult.java index 9454c353..858ccc63 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/ProcessingResult.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/ProcessingResult.java @@ -4,7 +4,7 @@ import org.apache.kafka.streams.processor.api.Record; /** - * The processing result class + * The processing result class. * * @param The type of the successful record * @param The type of the failed record @@ -12,17 +12,17 @@ @Getter public class ProcessingResult { /** - * The successful record + * The successful record. */ private V value; /** - * The failed record wrapped in a processing error + * The failed record wrapped in a processing error. */ private ProcessingError error; /** - * Private constructor that sets the success value + * Private constructor that sets the success value. * * @param value The success value */ @@ -31,7 +31,7 @@ private ProcessingResult(V value) { } /** - * Private constructor that sets the error value + * Private constructor that sets the error value. * * @param error the ProcessingError containing the */ @@ -40,7 +40,7 @@ private ProcessingResult(ProcessingError error) { } /** - * Create a successful processing result + * Create a successful processing result. * * @param value The successful record value * @param The type of the successful record @@ -54,16 +54,19 @@ public static ProcessingResult success(V value) { /** *

Wraps a record's value with ProcessingResult.success(V value).

- *

The resulting stream needs to be handled with TopologyErrorHandler#catchErrors(KStream) for automatic DLQ redirection on failed records.

+ *

The resulting stream needs to be handled with TopologyErrorHandler#catchErrors(KStream) + * for automatic DLQ redirection on failed records.

* - * @param record The resulting successful Record from the processor that needs to be wrapped in a ProcessingResult - * @param The type of the record key - * @param The type of the ProcessingResult successful value - * @param The type of the ProcessingResult error value + * @param message The resulting successful Record from the processor that needs to be wrapped in a ProcessingResult + * @param The type of the record key + * @param The type of the ProcessingResult successful value + * @param The type of the ProcessingResult error value * @return The initial Record, with value wrapped in a ProcessingResult */ - public static Record> wrapRecordSuccess(Record record) { - return new Record<>(record.key(), ProcessingResult.success(record.value()), record.timestamp()); + public static Record> wrapRecordSuccess( + Record message) { + return new Record<>(message.key(), ProcessingResult.success(message.value()), + message.timestamp()); } /** @@ -71,7 +74,8 @@ public static Record> wrapRecordSuccess(Re * Wraps a key, value and timestamp in a Record with ProcessingResult#success(V value) as value. *

*

- * The resulting stream needs to be handled with TopologyErrorHandler#catchErrors(KStream) for automatic DLQ redirection on failed records. + * The resulting stream needs to be handled with TopologyErrorHandler#catchErrors(KStream) + * for automatic DLQ redirection on failed records. *

* * @param key The key to put in the resulting record @@ -82,16 +86,18 @@ public static Record> wrapRecordSuccess(Re * @param The type of the ProcessingResult error value * @return A Record with value wrapped in a {@link ProcessingResult} */ - public static Record> wrapRecordSuccess(K key, V value, long timestamp) { + public static Record> wrapRecordSuccess(K key, V value, + long timestamp) { return new Record<>(key, ProcessingResult.success(value), timestamp); } /** *

- * Create a failed processing result + * Create a failed processing result. *

*

- * If you are using this in a Processor, refer to {@link ProcessingResult#wrapRecordFailure(Exception, Record)} for easier syntax. + * If you are using this in a Processor, refer to + * {@link ProcessingResult#wrapRecordFailure(Exception, Record)} for easier syntax. *

* * @param e The exception @@ -109,7 +115,9 @@ public static ProcessingResult fail(Exception e, V2 failedRecordV * Create a failed processing result. *

*

- * If you are using this in a Processor, refer to {@link ProcessingResult#wrapRecordFailure(Exception, Record, String)} for easier syntax. + * If you are using this in a Processor, refer to + * {@link ProcessingResult#wrapRecordFailure(Exception, Record, String)} + * for easier syntax. *

* * @param e The exception @@ -119,7 +127,8 @@ public static ProcessingResult fail(Exception e, V2 failedRecordV * @param The type of the failed record * @return A processing result containing the failed record */ - public static ProcessingResult fail(Exception e, V2 failedRecordValue, String contextMessage) { + public static ProcessingResult fail(Exception e, V2 failedRecordValue, + String contextMessage) { return new ProcessingResult<>(new ProcessingError<>(e, contextMessage, failedRecordValue)); } @@ -128,18 +137,22 @@ public static ProcessingResult fail(Exception e, V2 failedRecordV * Wraps a record's value with {@link ProcessingResult#fail(Exception, Object)} )}. *

*

- * The resulting stream needs to be handled with TopologyErrorHandler#catchErrors(KStream) for automatic DLQ redirection on failed records. + * The resulting stream needs to be handled with TopologyErrorHandler#catchErrors(KStream) for automatic + * DLQ redirection on failed records. *

* * @param e The initial exception - * @param failedRecord The resulting failed Record from the processor that needs to be wrapped in a {@link ProcessingResult} + * @param failedRecord The resulting failed Record from + * the processor that needs to be wrapped in a {@link ProcessingResult} * @param The type of the record key * @param The type of the ProcessingResult successful value * @param The type of the ProcessingResult error value * @return The initial Record, with value wrapped in a {@link ProcessingError} and {@link ProcessingResult} */ - public static Record> wrapRecordFailure(Exception e, Record failedRecord) { - return new Record<>(failedRecord.key(), ProcessingResult.fail(e, failedRecord.value()), failedRecord.timestamp()); + public static Record> wrapRecordFailure(Exception e, + Record failedRecord) { + return new Record<>(failedRecord.key(), ProcessingResult.fail(e, failedRecord.value()), + failedRecord.timestamp()); } /** @@ -147,49 +160,64 @@ public static Record> wrapRecordFailure(Ex * Wraps a record's value with {@link ProcessingResult#fail(Exception, Object, String)}. *

*

- * The resulting stream needs to be handled with TopologyErrorHandler#catchErrors(KStream) for automatic DLQ redirection on failed records. + * The resulting stream needs to be handled with TopologyErrorHandler#catchErrors(KStream) + * for automatic DLQ redirection on failed records. *

* * @param e The initial exception - * @param failedRecord The resulting failed Record from the processor that needs to be wrapped in a {@link ProcessingResult} + * @param failedRecord The resulting failed Record from + * the processor that needs to be wrapped in a {@link ProcessingResult} * @param contextMessage The custom context message that will be added in the stack trace * @param The type of the record key * @param The type of the ProcessingResult successful value * @param The type of the ProcessingResult error value * @return The initial Record, with value wrapped in a {@link ProcessingError} and {@link ProcessingResult} */ - public static Record> wrapRecordFailure(Exception e, Record failedRecord, String contextMessage) { - return new Record<>(failedRecord.key(), ProcessingResult.fail(e, failedRecord.value(), contextMessage), failedRecord.timestamp()); + public static Record> wrapRecordFailure(Exception e, + Record failedRecord, + String contextMessage) { + return new Record<>(failedRecord.key(), + ProcessingResult.fail(e, failedRecord.value(), contextMessage), + failedRecord.timestamp()); } /** - *

Wraps a key, value and timestamp in a Record with {@link ProcessingResult#fail(Exception, Object, String)} as value.

- *

The resulting stream needs to be handled with TopologyErrorHandler#catchErrors(KStream) for automatic DLQ redirection on failed records.

+ *

Wraps a key, value and timestamp in a Record with {@link ProcessingResult#fail(Exception, Object, String)} + * as value.

+ *

The resulting stream needs to be handled with TopologyErrorHandler#catchErrors(KStream) for automatic + * DLQ redirection on failed records.

* * @param e The initial exception * @param key The key to put in the resulting record - * @param failedValue The resulting failed value from the processor that needs to be wrapped in a {@link ProcessingResult} + * @param failedValue The resulting failed value from + * the processor that needs to be wrapped in a {@link ProcessingResult} * @param timestamp The timestamp to apply on the resulting record * @param The type of the record key * @param The type of the ProcessingResult successful value * @param The type of the ProcessingResult error value * @return A Record with value wrapped in a {@link ProcessingError} and {@link ProcessingResult} */ - public static Record> wrapRecordFailure(Exception e, K key, V2 failedValue, long timestamp) { + public static Record> wrapRecordFailure(Exception e, + K key, + V2 failedValue, + long timestamp) { return new Record<>(key, ProcessingResult.fail(e, failedValue), timestamp); } /** *

- * Wraps a key, value and timestamp in a Record with {@link ProcessingResult#fail(Exception, Object, String)} as value. + * Wraps a key, value and timestamp in a Record + * with {@link ProcessingResult#fail(Exception, Object, String)} as value. *

*

- * The resulting stream needs to be handled with TopologyErrorHandler#catchErrors(KStream) for automatic DLQ redirection on failed records. + * The resulting stream needs to be handled with TopologyErrorHandler#catchErrors(KStream) for automatic + * DLQ redirection on failed records. *

* * @param e The initial exception * @param key The key to put in the resulting record - * @param failedValue The resulting failed value from the processor that needs to be wrapped in a {@link ProcessingResult} + * @param failedValue The resulting failed value from the processor + * that needs to be wrapped in a {@link ProcessingResult} * @param timestamp The timestamp to apply on the resulting record * @param contextMessage The custom context message that will be added in the stack trace * @param The type of the record key @@ -197,12 +225,16 @@ public static Record> wrapRecordFailure(Ex * @param The type of the ProcessingResult error value * @return A Record with value wrapped in a {@link ProcessingError} and {@link ProcessingResult} */ - public static Record> wrapRecordFailure(Exception e, K key, V2 failedValue, long timestamp, String contextMessage) { + public static Record> wrapRecordFailure(Exception e, + K key, + V2 failedValue, + long timestamp, + String contextMessage) { return new Record<>(key, ProcessingResult.fail(e, failedValue, contextMessage), timestamp); } /** - * Is the processing result valid ? + * Is the processing result valid. * Is it valid either if it contains a successful value or an error * * @return true if valid, false otherwise diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/TopologyErrorHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/TopologyErrorHandler.java index b3f12ea0..e8e6ee06 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/TopologyErrorHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/TopologyErrorHandler.java @@ -2,6 +2,7 @@ import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; import com.michelin.kstreamplify.utils.SerdesUtils; +import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.serialization.Serdes; @@ -11,79 +12,86 @@ import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Produced; -import java.util.Map; - /** - * The topology error handler class + * The topology error handler class. */ @Slf4j public class TopologyErrorHandler { private static final String BRANCHING_NAME_NOMINAL = "branch-nominal"; - private TopologyErrorHandler() { } + private TopologyErrorHandler() { + } /** - * Catch the errors from the given stream + * Catch the errors from the given stream. + * * @param stream The stream of processing result that may contain processing errors + * @param The key type + * @param The type of the successful record + * @param The type of the failed record * @return A stream filtered from all processing errors - * @param The key type - * @param The type of the successful record - * @param The type of the failed record */ - public static KStream catchErrors(KStream> stream) { + public static KStream catchErrors(KStream> stream) { return catchErrors(stream, false); } /** - * Catch the errors from the given stream - * @param stream The stream of processing result that may contain processing errors + * Catch the errors from the given stream. + * + * @param stream The stream of processing result that may contain processing errors * @param allowTombstone Allow sending tombstone in DLQ topic or to be returned + * @param The key type + * @param The type of the successful record + * @param The type of the failed record * @return A stream filtered from all processing errors - * @param The key type - * @param The type of the successful record - * @param The type of the failed record */ - public static KStream catchErrors(KStream> stream, boolean allowTombstone) { - Map>> branches; + public static KStream catchErrors(KStream> stream, + boolean allowTombstone) { + Map>> branches; String branchNamePrefix = stream.toString().split("@")[1]; if (!allowTombstone) { branches = stream - .filter((key, value) -> value != null) - .filterNot((key, value) -> value.getValue() == null && value.getError() == null) - .split(Named.as(branchNamePrefix)) - .branch((key, value) -> value.isValid(), Branched.as(BRANCHING_NAME_NOMINAL)) - .defaultBranch(Branched.withConsumer(ks -> TopologyErrorHandler.handleErrors(ks - .mapValues(ProcessingResult::getError)))); + .filter((key, value) -> value != null) + .filterNot((key, value) -> value.getValue() == null && value.getError() == null) + .split(Named.as(branchNamePrefix)) + .branch((key, value) -> value.isValid(), Branched.as(BRANCHING_NAME_NOMINAL)) + .defaultBranch(Branched.withConsumer(ks -> TopologyErrorHandler.handleErrors(ks + .mapValues(ProcessingResult::getError)))); } else { branches = stream - .filter((key, value) -> value != null) - .split(Named.as(branchNamePrefix)) - .branch((key, value) -> value.getError() == null, Branched.as(BRANCHING_NAME_NOMINAL)) - .defaultBranch(Branched.withConsumer(ks -> TopologyErrorHandler.handleErrors(ks - .mapValues(ProcessingResult::getError)))); + .filter((key, value) -> value != null) + .split(Named.as(branchNamePrefix)) + .branch((key, value) -> value.getError() == null, + Branched.as(BRANCHING_NAME_NOMINAL)) + .defaultBranch(Branched.withConsumer(ks -> TopologyErrorHandler.handleErrors(ks + .mapValues(ProcessingResult::getError)))); } return branches - .get(branchNamePrefix + BRANCHING_NAME_NOMINAL) - .mapValues(ProcessingResult::getValue); + .get(branchNamePrefix + BRANCHING_NAME_NOMINAL) + .mapValues(ProcessingResult::getValue); } /** - * Process a stream of processing errors and route it to the configured DLQ topic + * Process a stream of processing errors and route it to the configured DLQ topic. + * * @param errorsStream The stream of processing errors - * @param The key type - * @param The value type + * @param The key type + * @param The value type */ private static void handleErrors(KStream> errorsStream) { if (StringUtils.isBlank(KafkaStreamsExecutionContext.getDlqTopicName())) { - log.warn("Failed to route topology error to the designated DLQ (Dead Letter Queue) topic. Please make sure to define a DLQ topic in your KafkaStreamsStarter bean configuration."); + log.warn( + "Failed to route topology error to the designated DLQ (Dead Letter Queue) topic. " + + + "Please make sure to define a DLQ topic in your KafkaStreamsStarter bean configuration."); return; } errorsStream - .map((key, value) -> new KeyValue<>(key == null ? "null" : key.toString(), value)) - .processValues(GenericErrorProcessor::new) - .to(KafkaStreamsExecutionContext.getDlqTopicName(), Produced.with(Serdes.String(), - SerdesUtils.getSerdesForValue())); + .map((key, value) -> new KeyValue<>(key == null ? "null" : key.toString(), value)) + .processValues(GenericErrorProcessor::new) + .to(KafkaStreamsExecutionContext.getDlqTopicName(), Produced.with(Serdes.String(), + SerdesUtils.getSerdesForValue())); } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java index ffa821ee..a7c311c0 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java @@ -1,9 +1,14 @@ package com.michelin.kstreamplify.initializer; +import static com.michelin.kstreamplify.constants.InitializerConstants.SERVER_PORT_PROPERTY; + import com.michelin.kstreamplify.constants.InitializerConstants; import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; import com.michelin.kstreamplify.properties.PropertiesUtils; import com.michelin.kstreamplify.rest.DefaultProbeController; +import java.util.HashMap; +import java.util.Properties; +import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -14,70 +19,65 @@ import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.state.HostInfo; -import java.util.HashMap; -import java.util.Properties; -import java.util.stream.Collectors; - -import static com.michelin.kstreamplify.constants.InitializerConstants.SERVER_PORT_PROPERTY; - /** - * The Kafka Streams initializer class + * The Kafka Streams initializer class. */ @Slf4j @Getter public class KafkaStreamsInitializer { /** - * The Kafka Streams instance + * The Kafka Streams instance. */ private KafkaStreams kafkaStreams; /** - * The Kafka Streams starter + * The Kafka Streams starter. */ private KafkaStreamsStarter kafkaStreamsStarter; /** - * The topology + * The topology. */ private Topology topology; /** - * The Kafka properties + * The Kafka properties. */ protected Properties kafkaProperties; /** - * The application properties + * The application properties. */ protected Properties properties; /** - * The DLQ topic + * The DLQ topic. */ private String dlq; /** - * The host info + * The host info. */ private HostInfo hostInfo; /** - * The server port + * The server port. */ protected int serverPort; /** - * Init the Kafka Streams - * @param kStreamsStarter The Kafka Streams starter + * Init the Kafka Streams. + * + * @param streamsStarter The Kafka Streams starter */ - public void init(KafkaStreamsStarter kStreamsStarter) { - kafkaStreamsStarter = kStreamsStarter; - + public void init(KafkaStreamsStarter streamsStarter) { + kafkaStreamsStarter = streamsStarter; + initProperties(); - + initSerdesConfig(); - initDLQ(); + initDlq(); initHostInfo(); @@ -103,61 +103,64 @@ public void init(KafkaStreamsStarter kStreamsStarter) { } /** - * Init the Kafka Streams execution context + * Init the Kafka Streams execution context. */ private void initSerdesConfig() { KafkaStreamsExecutionContext.setSerdesConfig( - kafkaProperties.entrySet().stream().collect( - Collectors.toMap( - e -> String.valueOf(e.getKey()), - e -> String.valueOf(e.getValue()), - (prev, next) -> next, HashMap::new - )) + kafkaProperties.entrySet().stream().collect( + Collectors.toMap( + e -> String.valueOf(e.getKey()), + e -> String.valueOf(e.getValue()), + (prev, next) -> next, HashMap::new + )) ); } - + /** - * Init the Kafka Streams default DLQ + * Init the Kafka Streams default DLQ. */ - private void initDLQ() { + private void initDlq() { dlq = kafkaStreamsStarter.dlqTopic(); KafkaStreamsExecutionContext.setDlqTopicName(dlq); } /** - * Init the host information + * Init the host information. */ private void initHostInfo() { - String ipEnvVarName = (String) kafkaProperties.get(InitializerConstants.IP_SYSTEM_VARIABLE_PROPERTY); + String ipEnvVarName = + (String) kafkaProperties.get(InitializerConstants.IP_SYSTEM_VARIABLE_PROPERTY); if (StringUtils.isBlank(ipEnvVarName)) { ipEnvVarName = InitializerConstants.IP_SYSTEM_VARIABLE_DEFAULT; } - String myIP = System.getenv(ipEnvVarName); - String host = StringUtils.isNotBlank(myIP) ? myIP : InitializerConstants.LOCALHOST; + String myIp = System.getenv(ipEnvVarName); + String host = StringUtils.isNotBlank(myIp) ? myIp : InitializerConstants.LOCALHOST; hostInfo = new HostInfo(host, serverPort); - log.info("The Kafka Streams \"{}\" is running on {}:{}", KafkaStreamsExecutionContext.getProperties() - .getProperty(StreamsConfig.APPLICATION_ID_CONFIG), hostInfo.host(), hostInfo.port()); + log.info("The Kafka Streams \"{}\" is running on {}:{}", + KafkaStreamsExecutionContext.getProperties() + .getProperty(StreamsConfig.APPLICATION_ID_CONFIG), hostInfo.host(), + hostInfo.port()); KafkaStreamsExecutionContext.getProperties().put(StreamsConfig.APPLICATION_SERVER_CONFIG, - String.format("%s:%s", hostInfo.host(), hostInfo.port())); + String.format("%s:%s", hostInfo.host(), hostInfo.port())); } /** - * Init the HTTP server + * Init the HTTP server. */ protected void initHttpServer() { new DefaultProbeController(this); } /** - * Init all properties + * Init all properties. */ protected void initProperties() { properties = PropertiesUtils.loadProperties(); - serverPort = (Integer) properties.get(SERVER_PORT_PROPERTY);; + serverPort = (Integer) properties.get(SERVER_PORT_PROPERTY); kafkaProperties = PropertiesUtils.loadKafkaProperties(properties); @@ -165,25 +168,28 @@ protected void initProperties() { } /** - * Default uncaught exception handler + * Default uncaught exception handler. + * * @param exception The exception * @return The execution */ - protected StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse onStreamsUncaughtException(Throwable exception) { + protected StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse onStreamsUncaughtException( + Throwable exception) { log.error("A not covered exception occurred in {} Kafka Streams. Shutting down...", - kafkaProperties.get(StreamsConfig.APPLICATION_ID_CONFIG), exception); + kafkaProperties.get(StreamsConfig.APPLICATION_ID_CONFIG), exception); return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; } /** - * Default state change listener + * Default state change listener. + * * @param newState The new state * @param oldState The old state */ protected void onStateChange(KafkaStreams.State newState, KafkaStreams.State oldState) { if (newState.equals(KafkaStreams.State.ERROR)) { log.error("The {} Kafka Streams is in error state...", - kafkaProperties.get(StreamsConfig.APPLICATION_ID_CONFIG)); + kafkaProperties.get(StreamsConfig.APPLICATION_ID_CONFIG)); System.exit(3); } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java index c16bbcd6..9648220d 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java @@ -4,17 +4,18 @@ import org.apache.kafka.streams.StreamsBuilder; /** - * The Kafka Streams starter interface + * The Kafka Streams starter interface. */ public abstract class KafkaStreamsStarter { /** - * Define the topology of the Kafka Streams + * Define the topology of the Kafka Streams. + * * @param streamsBuilder The streams builder */ public abstract void topology(StreamsBuilder streamsBuilder); /** - *

Define the dead letter queue (DLQ) topic

+ *

Define the dead letter queue (DLQ) topic

. *

If you don't want to use the DLQ topic, you can return {@link org.apache.commons.lang3.StringUtils#EMPTY}

* * @return The dead letter queue (DLQ) topic @@ -22,8 +23,10 @@ public abstract class KafkaStreamsStarter { public abstract String dlqTopic(); /** - * Define runnable code after the Kafka Streams startup + * Define runnable code after the Kafka Streams startup. + * * @param kafkaStreams The Kafka Streams instance */ - public void onStart(KafkaStreams kafkaStreams) { } + public void onStart(KafkaStreams kafkaStreams) { + } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/DlqTopic.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/DlqTopic.java index f92d3e9f..9bd57a1b 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/DlqTopic.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/DlqTopic.java @@ -4,13 +4,13 @@ import lombok.Getter; /** - * The dead letter queue (DLQ) topic + * The dead letter queue (DLQ) topic. */ @Getter @Builder public class DlqTopic { /** - * The DLQ topic name + * The DLQ topic name. */ private String name; } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/RestServiceResponse.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/RestServiceResponse.java index f4a00688..eb61978f 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/RestServiceResponse.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/RestServiceResponse.java @@ -6,7 +6,8 @@ import lombok.Setter; /** - * Rest service response + * Rest service response. + * * @param The body type */ @Getter @@ -15,12 +16,12 @@ @Builder public class RestServiceResponse { /** - * The HTTP status + * The HTTP status. */ private int status; /** - * The request body + * The request body. */ private T body; } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/TopologyExposeJsonModel.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/TopologyExposeJsonModel.java deleted file mode 100644 index 3bdd5788..00000000 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/TopologyExposeJsonModel.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.michelin.kstreamplify.model; - -import lombok.Getter; -import lombok.Setter; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * The topology expose class - */ -@Getter -@Setter -public class TopologyExposeJsonModel { - /** - * The input topics - */ - private Set inTopicNameList = new HashSet<>(); - - /** - * The output topics - */ - private Set outTopicNameList = new HashSet<>(); - - /** - * The state stores - */ - private Set streamStateStore = new HashSet<>(); - - /** - * The internal stream content - */ - private List internalStreamContent = new ArrayList<>(); - - /** - * The stream name - */ - private String streamName; -} diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/TopologyObject.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/TopologyObject.java deleted file mode 100644 index 1a8f1ee4..00000000 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/TopologyObject.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.michelin.kstreamplify.model; - -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; - -/** - * The topology class - */ -@Getter -@Setter -@NoArgsConstructor -@AllArgsConstructor -public class TopologyObject { - /** - * The topology type - */ - private TopologyObjectType type; - - /** - * The topology name - */ - private String objectName; -} diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/TopologyObjectType.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/TopologyObjectType.java deleted file mode 100644 index 532247f7..00000000 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/TopologyObjectType.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.michelin.kstreamplify.model; - -/** - * The topology type enum - */ -public enum TopologyObjectType { - /** - * The input topic type - */ - TOPIC_IN, - - /** - * The output topic type - */ - TOPIC_OUT, - - /** - * The stream type - */ - STREAM; -} diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/TopologyPart.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/TopologyPart.java deleted file mode 100644 index 37ff73fd..00000000 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/model/TopologyPart.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.michelin.kstreamplify.model; - -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; - -import java.util.ArrayList; -import java.util.List; - -/** - * The topology part class - */ -@Getter -@Setter -@NoArgsConstructor -public class TopologyPart { - /** - * The input element - */ - TopologyObject inputElement = new TopologyObject(); - - /** - * The sub element name - */ - TopologyObject subElementName = new TopologyObject(); - - /** - * The output element - */ - TopologyObject outputElement = new TopologyObject(); - - /** - * The transformation - */ - List detailedTransformation = new ArrayList<>(); -} diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/properties/PropertiesUtils.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/properties/PropertiesUtils.java index 98e470a2..879e19e9 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/properties/PropertiesUtils.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/properties/PropertiesUtils.java @@ -1,30 +1,33 @@ package com.michelin.kstreamplify.properties; -import lombok.AccessLevel; -import lombok.NoArgsConstructor; -import org.apache.commons.lang3.StringUtils; -import org.yaml.snakeyaml.Yaml; +import static com.michelin.kstreamplify.constants.PropertyConstants.DEFAULT_PROPERTY_FILE; +import static com.michelin.kstreamplify.constants.PropertyConstants.KAFKA_PROPERTIES_PREFIX; +import static com.michelin.kstreamplify.constants.PropertyConstants.PROPERTY_SEPARATOR; import java.io.IOException; import java.io.InputStream; import java.util.LinkedHashMap; import java.util.Properties; - -import static com.michelin.kstreamplify.constants.PropertyConstants.*; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.yaml.snakeyaml.Yaml; /** - * Properties utils + * Properties utils. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class PropertiesUtils { /** - * Load the properties from the default properties file + * Load the properties from the default properties file. + * * @return The properties */ public static Properties loadProperties() { Yaml yaml = new Yaml(); - try (InputStream inputStream = PropertiesUtils.class.getClassLoader().getResourceAsStream(DEFAULT_PROPERTY_FILE)) { + try (InputStream inputStream = PropertiesUtils.class.getClassLoader() + .getResourceAsStream(DEFAULT_PROPERTY_FILE)) { LinkedHashMap propsMap = yaml.load(inputStream); return parsePropertiesMap(propsMap); } catch (IOException e) { @@ -33,7 +36,8 @@ public static Properties loadProperties() { } /** - * Get the Kafka properties only from the given properties + * Get the Kafka properties only from the given properties. + * * @param props The properties * @return The Kafka properties */ @@ -41,14 +45,16 @@ public static Properties loadKafkaProperties(Properties props) { Properties resultProperties = new Properties(); for (var prop : props.entrySet()) { if (StringUtils.contains(prop.getKey().toString(), KAFKA_PROPERTIES_PREFIX)) { - resultProperties.put(StringUtils.remove(prop.getKey().toString(), KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR), prop.getValue()); + resultProperties.put(StringUtils.remove(prop.getKey().toString(), + KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR), prop.getValue()); } } return resultProperties; } /** - * Parse a map into Properties + * Parse a map into Properties. + * * @param map The map * @return The properties */ @@ -57,9 +63,10 @@ private static Properties parsePropertiesMap(LinkedHashMap map) } /** - * Parse a given key - * @param key The key - * @param map The underlying map + * Parse a given key. + * + * @param key The key + * @param map The underlying map * @param props The properties * @return The properties */ diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/properties/RocksDBConfig.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/properties/RocksDbConfig.java similarity index 52% rename from kstreamplify-core/src/main/java/com/michelin/kstreamplify/properties/RocksDBConfig.java rename to kstreamplify-core/src/main/java/com/michelin/kstreamplify/properties/RocksDbConfig.java index 1cfff70f..85ce2884 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/properties/RocksDBConfig.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/properties/RocksDbConfig.java @@ -1,111 +1,102 @@ package com.michelin.kstreamplify.properties; import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import java.util.Map; import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.CompressionType; import org.rocksdb.Options; -import java.util.Map; - /** - * The RockDB configuration class + * The RockDB configuration class. */ -public class RocksDBConfig implements RocksDBConfigSetter { +public class RocksDbConfig implements RocksDBConfigSetter { /** - * The RocksDB cache size config key + * The RocksDB cache size config key. */ public static final String ROCKSDB_CACHE_SIZE_CONFIG = "rocksdb.config.cache.size"; /** - * The RocksDB write buffer size config key + * The RocksDB write buffer size config key. */ - public static final String ROCKSDB_WRITE_BUFFER_SIZE_CONFIG = "rocksdb.config.write.buffer.size"; + public static final String ROCKSDB_WRITE_BUFFER_SIZE_CONFIG = + "rocksdb.config.write.buffer.size"; /** - * The RocksDB block size config key + * The RocksDB block size config key. */ public static final String ROCKSDB_BLOCK_SIZE_CONFIG = "rocksdb.config.block.size"; /** - * The RocksDB max write buffer config + * The RocksDB max write buffer config. */ public static final String ROCKSDB_MAX_WRITE_BUFFER_CONFIG = "rocksdb.config.max.write.buffer"; /** - * The RocksDB compression type config key + * The RocksDB compression type config key. */ public static final String ROCKSDB_COMPRESSION_TYPE_CONFIG = "rocksdb.config.compression.type"; /** - * The RocksDB cache index block enabled config + * The RocksDB cache index block enabled config. */ - public static final String ROCKSDB_CACHE_INDEX_BLOCK_ENABLED_CONFIG = "rocksdb.config.cache.index.block.enabled"; + public static final String ROCKSDB_CACHE_INDEX_BLOCK_ENABLED_CONFIG = + "rocksdb.config.cache.index.block.enabled"; /** - * One KB in B + * One KB in B. */ private static final long ONE_KB = 1024L; /** - * The RocksDB default cache size + * The RocksDB default cache size. */ public static final Long ROCKSDB_CACHE_SIZE_DEFAULT = 16 * ONE_KB * ONE_KB; /** - * The RocksDB default write buffer size + * The RocksDB default write buffer size. */ public static final Long ROCKSDB_WRITE_BUFFER_SIZE_DEFAULT = 4 * ONE_KB * ONE_KB; /** - * The RocksDB default block size + * The RocksDB default block size. */ public static final Long ROCKSDB_BLOCK_SIZE_DEFAULT = 4 * ONE_KB; /** - * The RocksDB default max write buffer + * The RocksDB default max write buffer. */ public static final Integer ROCKSDB_MAX_WRITE_BUFFER_DEFAULT = 2; /** - * The RocksDB default compression type + * The RocksDB default compression type. */ public static final String ROCKSDB_COMPRESSION_TYPE_DEFAULT = ""; /** - * The RocksDB default cache index block enabled + * The RocksDB default cache index block enabled. */ public static final Boolean ROCKSDB_CACHE_INDEX_BLOCK_ENABLED_DEFAULT = true; /** - * The RocksDB cache + * The RocksDB cache. */ private org.rocksdb.Cache cache = null; /** - * Set the RocksDB configuration + * Set the RocksDB configuration. + * * @param storeName The store name - * @param options The options - * @param configs The configs + * @param options The options + * @param configs The configs */ @Override - public void setConfig(final String storeName, final Options options, final Map configs) { - long blockCacheSize = KafkaStreamsExecutionContext.getProperties().containsKey(ROCKSDB_CACHE_SIZE_CONFIG) ? - Long.parseLong(KafkaStreamsExecutionContext.getProperties().getProperty(ROCKSDB_CACHE_SIZE_CONFIG)) : ROCKSDB_CACHE_SIZE_DEFAULT; - - long writeBufferSize = KafkaStreamsExecutionContext.getProperties().containsKey(ROCKSDB_WRITE_BUFFER_SIZE_CONFIG) ? - Long.parseLong(KafkaStreamsExecutionContext.getProperties().getProperty(ROCKSDB_WRITE_BUFFER_SIZE_CONFIG)) : ROCKSDB_WRITE_BUFFER_SIZE_DEFAULT; - - long blockSize = KafkaStreamsExecutionContext.getProperties().containsKey(ROCKSDB_BLOCK_SIZE_CONFIG) ? - Long.parseLong(KafkaStreamsExecutionContext.getProperties().getProperty(ROCKSDB_BLOCK_SIZE_CONFIG)) : ROCKSDB_BLOCK_SIZE_DEFAULT; - - int maxWriteBuffer = KafkaStreamsExecutionContext.getProperties().containsKey(ROCKSDB_MAX_WRITE_BUFFER_CONFIG) ? - Integer.parseInt(KafkaStreamsExecutionContext.getProperties().getProperty(ROCKSDB_MAX_WRITE_BUFFER_CONFIG)) : ROCKSDB_MAX_WRITE_BUFFER_DEFAULT; - - boolean cacheIndexBlock = KafkaStreamsExecutionContext.getProperties().containsKey(ROCKSDB_CACHE_INDEX_BLOCK_ENABLED_CONFIG) ? - Boolean.parseBoolean(KafkaStreamsExecutionContext.getProperties().getProperty(ROCKSDB_CACHE_INDEX_BLOCK_ENABLED_CONFIG)) : ROCKSDB_CACHE_INDEX_BLOCK_ENABLED_DEFAULT; - - String compressionType = KafkaStreamsExecutionContext.getProperties().getProperty(ROCKSDB_COMPRESSION_TYPE_CONFIG, ROCKSDB_COMPRESSION_TYPE_DEFAULT); + public void setConfig(final String storeName, final Options options, + final Map configs) { + long blockCacheSize = + KafkaStreamsExecutionContext.getProperties().containsKey(ROCKSDB_CACHE_SIZE_CONFIG) + ? Long.parseLong(KafkaStreamsExecutionContext.getProperties() + .getProperty(ROCKSDB_CACHE_SIZE_CONFIG)) : ROCKSDB_CACHE_SIZE_DEFAULT; if (cache == null) { cache = new org.rocksdb.LRUCache(blockCacheSize); @@ -113,11 +104,40 @@ public void setConfig(final String storeName, final Options options, final Map { - exchange.sendResponseHeaders(ProbeService.readinessProbe(kafkaStreamsInitializer).getStatus(), 0); + exchange.sendResponseHeaders( + ProbeService.readinessProbe(kafkaStreamsInitializer).getStatus(), 0); var output = exchange.getResponseBody(); output.close(); exchange.close(); @@ -56,13 +70,16 @@ private void readinessProbe(KafkaStreamsInitializer kafkaStreamsInitializer, Str /** - * Kubernetes' liveness probe - * @param kafkaStreamsInitializer The Kafka Streams initializer - * @param livenessPath The liveness path + * Kubernetes' liveness probe. + * + * @param kafkaStreamsInitializer The Kafka Streams initializer. + * @param livenessPath The liveness path. */ - private void livenessProbe(KafkaStreamsInitializer kafkaStreamsInitializer, String livenessPath) { + private void livenessProbe(KafkaStreamsInitializer kafkaStreamsInitializer, + String livenessPath) { server.createContext(livenessPath, (exchange -> { - exchange.sendResponseHeaders(ProbeService.livenessProbe(kafkaStreamsInitializer).getStatus(), 0); + exchange.sendResponseHeaders( + ProbeService.livenessProbe(kafkaStreamsInitializer).getStatus(), 0); var output = exchange.getResponseBody(); output.close(); exchange.close(); @@ -70,11 +87,13 @@ private void livenessProbe(KafkaStreamsInitializer kafkaStreamsInitializer, Stri } /** - * Expose the topology tree - * @param kafkaStreamsInitializer The Kafka Streams initializer - * @param exposeTopologyPath The expose topology path + * Expose the topology tree. + * + * @param kafkaStreamsInitializer The Kafka Streams initializer. + * @param exposeTopologyPath The expose topology path. */ - private void exposeTopology(KafkaStreamsInitializer kafkaStreamsInitializer, String exposeTopologyPath) { + private void exposeTopology(KafkaStreamsInitializer kafkaStreamsInitializer, + String exposeTopologyPath) { server.createContext(exposeTopologyPath, (exchange -> { var restServiceResponse = ProbeService.exposeTopology(kafkaStreamsInitializer); @@ -89,7 +108,8 @@ private void exposeTopology(KafkaStreamsInitializer kafkaStreamsInitializer, Str /** - * Callback to override in case of custom endpoint definition + * Callback to override in case of custom endpoint definition. + * * @param kafkaStreamsInitializer The Kafka Streams initializer */ protected void endpointCaller(KafkaStreamsInitializer kafkaStreamsInitializer) { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/services/ConvertTopology.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/services/ConvertTopology.java deleted file mode 100644 index 79283dea..00000000 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/services/ConvertTopology.java +++ /dev/null @@ -1,123 +0,0 @@ -package com.michelin.kstreamplify.services; - -import com.michelin.kstreamplify.model.TopologyExposeJsonModel; -import com.michelin.kstreamplify.model.TopologyObject; -import com.michelin.kstreamplify.model.TopologyObjectType; -import com.michelin.kstreamplify.model.TopologyPart; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyDescription; - -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * The convert topology class - */ -public class ConvertTopology { - private static final String SINK = "-SINK-"; - private static final String SOURCE = "-SOURCE-"; - - private ConvertTopology() { } - - /** - * Convert the Kafka Streams topology as REST JSON response - * @param streamName The Kafka Streams name - * @param topology The Kafka Streams topology - * @return The Kafka Streams topology as REST JSON response - */ - public static TopologyExposeJsonModel convertTopologyForRest(String streamName, Topology topology) { - var result = new TopologyExposeJsonModel(); - result.setStreamName(streamName); - for (TopologyDescription.Subtopology subTopology : topology.describe().subtopologies()) { - handleSubTopology(subTopology, result); - } - return result; - } - - private static void handleSubTopology(TopologyDescription.Subtopology subTopology, TopologyExposeJsonModel obj){ - Set nodeProcessed = new HashSet<>(); - for (TopologyDescription.Node node : subTopology.nodes()) { - if (!nodeProcessed.contains(node.name())){ - handleNode(nodeProcessed, obj, node, new TopologyPart()); - } - } - } - - private static void handleNode(Set nodeProcessed, TopologyExposeJsonModel obj, TopologyDescription.Node node, TopologyPart currentNodeAncestorsPath) { - nodeProcessed.add(node.name()); - List currentElements = currentNodeAncestorsPath.getDetailedTransformation(); - currentElements.add(node.name()); - currentNodeAncestorsPath.setDetailedTransformation(currentElements); - - TopologyObject elementName = currentNodeAncestorsPath.getSubElementName(); - - if (node.successors().size() > 1) { - var t = obj.getInternalStreamContent(); - - elementName.setObjectName(obj.getStreamName()+"\\r\\n"+"Element "+(obj.getInternalStreamContent().size()+1)); - elementName.setType(TopologyObjectType.STREAM); - currentNodeAncestorsPath.setSubElementName(elementName); - t.add(currentNodeAncestorsPath); - - obj.setInternalStreamContent(t); - currentNodeAncestorsPath = new TopologyPart(); - currentNodeAncestorsPath.setInputElement(elementName); - } - - if (node.successors().isEmpty()) { - var t = obj.getInternalStreamContent(); - - elementName.setObjectName(obj.getStreamName()+"\\r\\n"+"Element "+ (obj.getInternalStreamContent().size() + 1)); - elementName.setType(TopologyObjectType.STREAM); - currentNodeAncestorsPath.setSubElementName(elementName); - - TopologyObject elementNameSink = new TopologyObject(); - elementNameSink.setType(TopologyObjectType.TOPIC_OUT); - currentNodeAncestorsPath.setOutputElement(elementNameSink); - - if (node.name().contains(SINK)) { - TopologyDescription.Sink sink = (TopologyDescription.Sink)node; - elementNameSink.setObjectName(sink.topic()); - } - - t.add(currentNodeAncestorsPath); - - obj.setInternalStreamContent(t); - currentNodeAncestorsPath = new TopologyPart(); - currentNodeAncestorsPath.setInputElement(elementName); - } - - if (node.name().contains(SOURCE)) { - TopologyDescription.Source source = (TopologyDescription.Source)node; - var t = obj.getInTopicNameList(); - TopologyObject elementNameSource = new TopologyObject(); - elementNameSource.setType(TopologyObjectType.TOPIC_IN); - for (String topic : source.topicSet()){ - elementNameSource.setObjectName(topic); - t.add(topic); - } - currentNodeAncestorsPath.setInputElement(elementNameSource); - - obj.setInTopicNameList(t); - } else { - if (node.name().contains(SINK)) { - TopologyObject elementNameSink = new TopologyObject(); - elementNameSink.setType(TopologyObjectType.TOPIC_OUT); - - TopologyDescription.Sink sink = (TopologyDescription.Sink)node; - elementNameSink.setObjectName(sink.topic()); - var t = obj.getOutTopicNameList(); - t.add(sink.topic()); - obj.setOutTopicNameList(t); - currentNodeAncestorsPath.setOutputElement(elementNameSink); - } - } - - for (TopologyDescription.Node nodeBelow : node.successors()) { - handleNode(nodeProcessed, obj, nodeBelow, currentNodeAncestorsPath); - currentNodeAncestorsPath = new TopologyPart(); - currentNodeAncestorsPath.setInputElement(elementName); - } - } -} diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/services/ProbeService.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/services/ProbeService.java index 23ca9ab2..a52111f7 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/services/ProbeService.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/services/ProbeService.java @@ -1,8 +1,9 @@ package com.michelin.kstreamplify.services; +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer; import com.michelin.kstreamplify.model.RestServiceResponse; -import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import java.net.HttpURLConnection; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -10,64 +11,86 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.internals.StreamThread; -import java.net.HttpURLConnection; - /** - * Kafka Streams probe service + * Kafka Streams probe service. */ @Slf4j @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class ProbeService { /** - * Kubernetes' readiness probe + * Kubernetes' readiness probe. + * * @param kafkaStreamsInitializer The Kafka Streams initializer * @return An HTTP response based on the Kafka Streams state */ - public static RestServiceResponse readinessProbe(KafkaStreamsInitializer kafkaStreamsInitializer) { + public static RestServiceResponse readinessProbe( + KafkaStreamsInitializer kafkaStreamsInitializer) { if (kafkaStreamsInitializer.getKafkaStreams() != null) { log.debug("Kafka Stream \"{}\" state: {}", - KafkaStreamsExecutionContext.getProperties().getProperty(StreamsConfig.APPLICATION_ID_CONFIG), - kafkaStreamsInitializer.getKafkaStreams().state()); + KafkaStreamsExecutionContext.getProperties() + .getProperty(StreamsConfig.APPLICATION_ID_CONFIG), + kafkaStreamsInitializer.getKafkaStreams().state()); - if (kafkaStreamsInitializer.getKafkaStreams().state() == KafkaStreams.State.REBALANCING) { - long startingThreadCount = kafkaStreamsInitializer.getKafkaStreams().metadataForLocalThreads() + if (kafkaStreamsInitializer.getKafkaStreams().state() + == KafkaStreams.State.REBALANCING) { + long startingThreadCount = + kafkaStreamsInitializer.getKafkaStreams().metadataForLocalThreads() .stream() - .filter(t -> StreamThread.State.STARTING.name().compareToIgnoreCase(t.threadState()) == 0 || StreamThread.State.CREATED.name().compareToIgnoreCase(t.threadState()) == 0) + .filter(t -> StreamThread.State.STARTING.name() + .compareToIgnoreCase(t.threadState()) == 0 + || StreamThread.State.CREATED.name() + .compareToIgnoreCase(t.threadState()) == 0) .count(); - if (startingThreadCount == kafkaStreamsInitializer.getKafkaStreams().metadataForLocalThreads().size()) { - return RestServiceResponse.builder().status(HttpURLConnection.HTTP_NO_CONTENT).build(); + if (startingThreadCount + == kafkaStreamsInitializer.getKafkaStreams().metadataForLocalThreads().size()) { + return RestServiceResponse.builder() + .status(HttpURLConnection.HTTP_NO_CONTENT).build(); } } - return kafkaStreamsInitializer.getKafkaStreams().state().isRunningOrRebalancing() ? - RestServiceResponse.builder().status(HttpURLConnection.HTTP_OK).build() : RestServiceResponse.builder().status(HttpURLConnection.HTTP_UNAVAILABLE).build(); + return kafkaStreamsInitializer.getKafkaStreams().state().isRunningOrRebalancing() + ? RestServiceResponse.builder().status(HttpURLConnection.HTTP_OK).build() : + RestServiceResponse.builder().status(HttpURLConnection.HTTP_UNAVAILABLE) + .build(); } - return RestServiceResponse.builder().status(HttpURLConnection.HTTP_BAD_REQUEST).build(); + return RestServiceResponse.builder().status(HttpURLConnection.HTTP_BAD_REQUEST) + .build(); } /** - * Kubernetes' liveness probe + * Kubernetes' liveness probe. + * * @param kafkaStreamsInitializer The Kafka Streams initializer * @return An HTTP response based on the Kafka Streams state */ - public static RestServiceResponse livenessProbe(KafkaStreamsInitializer kafkaStreamsInitializer) { + public static RestServiceResponse livenessProbe( + KafkaStreamsInitializer kafkaStreamsInitializer) { if (kafkaStreamsInitializer.getKafkaStreams() != null) { - return kafkaStreamsInitializer.getKafkaStreams().state() != KafkaStreams.State.NOT_RUNNING ? RestServiceResponse.builder().status(HttpURLConnection.HTTP_OK).build() - : RestServiceResponse.builder().status(HttpURLConnection.HTTP_INTERNAL_ERROR).build(); + return kafkaStreamsInitializer.getKafkaStreams().state() + != KafkaStreams.State.NOT_RUNNING + ? RestServiceResponse.builder().status(HttpURLConnection.HTTP_OK).build() + : + RestServiceResponse.builder().status(HttpURLConnection.HTTP_INTERNAL_ERROR) + .build(); } - return RestServiceResponse.builder().status(HttpURLConnection.HTTP_NO_CONTENT).build(); + return RestServiceResponse.builder().status(HttpURLConnection.HTTP_NO_CONTENT) + .build(); } /** - * Get the Kafka Streams topology + * Get the Kafka Streams topology. + * * @param kafkaStreamsInitializer The Kafka Streams initializer * @return The Kafka Streams topology */ - public static RestServiceResponse exposeTopology(KafkaStreamsInitializer kafkaStreamsInitializer) { + public static RestServiceResponse exposeTopology( + KafkaStreamsInitializer kafkaStreamsInitializer) { if (kafkaStreamsInitializer.getTopology() != null) { - return RestServiceResponse.builder().status(HttpURLConnection.HTTP_OK).body(kafkaStreamsInitializer.getTopology().describe().toString()).build(); + return RestServiceResponse.builder().status(HttpURLConnection.HTTP_OK) + .body(kafkaStreamsInitializer.getTopology().describe().toString()).build(); } - return RestServiceResponse.builder().status(HttpURLConnection.HTTP_NO_CONTENT).build(); + return RestServiceResponse.builder().status(HttpURLConnection.HTTP_NO_CONTENT) + .build(); } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/SerdesUtils.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/SerdesUtils.java index dc21e384..d7086849 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/SerdesUtils.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/SerdesUtils.java @@ -1,40 +1,45 @@ package com.michelin.kstreamplify.utils; -import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import org.apache.avro.specific.SpecificRecord; /** - * The Serdes utils class + * The Serdes utils class. */ public final class SerdesUtils { - private SerdesUtils() { } + private SerdesUtils() { + } /** - * Return a key serdes for a requested class + * Return a key serdes for a requested class. + * * @param The class of requested serdes * @return a serdes for requested class */ - public static SpecificAvroSerde getSerdesForKey() { + public static SpecificAvroSerde getSerdesForKey() { return getSerdes(true); } /** - * Return a value serdes for a requested class + * Return a value serdes for a requested class. + * * @param The class of requested serdes * @return a serdes for requested class */ - public static SpecificAvroSerde getSerdesForValue() { + public static SpecificAvroSerde getSerdesForValue() { return getSerdes(false); } /** - * Return a serdes for a requested class + * Return a serdes for a requested class. + * * @param isSerdeForKey Is the serdes for a key or a value - * @param The class of requested serdes + * @param The class of requested serdes * @return a serdes for requested class */ - private static SpecificAvroSerde getSerdes(boolean isSerdeForKey) { + private static SpecificAvroSerde getSerdes( + boolean isSerdeForKey) { SpecificAvroSerde serde = new SpecificAvroSerde<>(); serde.configure(KafkaStreamsExecutionContext.getSerdesConfig(), isSerdeForKey); return serde; diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicUtils.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicUtils.java index 90c6d722..3e56e3a7 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicUtils.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicUtils.java @@ -1,11 +1,14 @@ package com.michelin.kstreamplify.utils; -import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import static com.michelin.kstreamplify.constants.PropertyConstants.PREFIX_PROPERTY_NAME; +import static com.michelin.kstreamplify.constants.PropertyConstants.PROPERTY_SEPARATOR; +import static com.michelin.kstreamplify.constants.PropertyConstants.REMAP_PROPERTY_NAME; +import static com.michelin.kstreamplify.constants.PropertyConstants.TOPIC_PROPERTY_NAME; -import static com.michelin.kstreamplify.constants.PropertyConstants.*; +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; /** - * The topic utils class + * The topic utils class. */ public final class TopicUtils { private TopicUtils() { @@ -42,17 +45,17 @@ public static String prefixAndDynamicRemap(String topicName, String prefixProper // Check for dynamic remap in properties String resultTopicName = properties.getProperty( - TOPIC_PROPERTY_NAME - + PROPERTY_SEPARATOR - + REMAP_PROPERTY_NAME - + PROPERTY_SEPARATOR - + topicName, - topicName); + TOPIC_PROPERTY_NAME + + PROPERTY_SEPARATOR + + REMAP_PROPERTY_NAME + + PROPERTY_SEPARATOR + + topicName, + topicName); // check if topic prefix property exists - String prefix = properties.getProperty(PREFIX_PROPERTY_NAME + PROPERTY_SEPARATOR + prefixPropertyKey, ""); + String prefix = + properties.getProperty(PREFIX_PROPERTY_NAME + PROPERTY_SEPARATOR + prefixPropertyKey, + ""); return prefix.concat(resultTopicName); } - - } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java index 69e3902c..3c0bab83 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java @@ -1,6 +1,8 @@ package com.michelin.kstreamplify.utils; +import static com.michelin.kstreamplify.constants.PropertyConstants.SELF; + import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; @@ -15,10 +17,8 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueStore; -import static com.michelin.kstreamplify.constants.PropertyConstants.SELF; - /** - * Wrapper class for simplifying topics interactions and their behaviors + * Wrapper class for simplifying topics interactions and their behaviors. * * @param The model used as the key avro of the topic. Can be String (Recommended) * @param The model used as the value avro of the topic. @@ -26,12 +26,13 @@ @AllArgsConstructor(access = AccessLevel.PUBLIC) public final class TopicWithSerde { /** - * Name of the topic + * Name of the topic. */ private final String topicName; /** - *

Name of the property key defined under kafka.properties.prefix. Used to prefix the topicName dynamically at runtime.

+ *

Name of the property key defined under kafka.properties.prefix. + * Used to prefix the topicName dynamically at runtime.

*

For instance, with the given following configuration :

*
{@code
      * kafka:
@@ -39,25 +40,26 @@ public final class TopicWithSerde {
      *     prefix:
      *       nsKey: "myNamespacePrefix."
      * }
- *

If the topic name is {@code myTopic} , at stream initialization the topic name wil resolve to {@code myNamespacePrefix.myTopic}

+ *

If the topic name is {@code myTopic} , at stream initialization the topic name wil resolve + * to {@code myNamespacePrefix.myTopic}

*/ private final String prefixPropertyKey; /** - * Key serde for the topic + * Key serde for the topic. */ @Getter private final Serde keySerde; /** - * Value serde for the topic + * Value serde for the topic. */ @Getter private final Serde valueSerde; /** - *

Additional constructor which uses default parameter "self" for prefixPropertyKey

+ *

Additional constructor which uses default parameter "self" for prefixPropertyKey.

* *

For instance, with the given following configuration :

*
{@code
@@ -66,7 +68,8 @@ public final class TopicWithSerde {
      *     prefix:
      *       self: "myNamespacePrefix."
      * }
- *

If the topic name is {@code myTopic} , at stream initialization the topic name wil resolve to {@code myNamespacePrefix.myTopic}

+ *

If the topic name is {@code myTopic} , at stream initialization the topic name wil resolve + * to {@code myNamespacePrefix.myTopic}

* * @param topicName Name of the topic * @param keySerde Key serde for the topic @@ -80,16 +83,17 @@ public TopicWithSerde(String topicName, Serde keySerde, Serde valueSerde) } /** - * Get the un-prefixed name of the Topic for specific usage + * Get the un-prefixed name of the Topic for specific usage. * - * @return The name of the topic, as defined during initialization, without ns4kafka prefixing + * @return The name of the topic, as defined during initialization */ public String getUnPrefixedName() { return topicName; } /** - * Override of the toString method, dynamically builds the topicName based on springBoot properties for environment/application + * Override of the toString method, dynamically builds the topicName based on springBoot + * properties for environment/application. * * @return The prefixed name of the topic */ @@ -99,7 +103,8 @@ public String toString() { } /** - * Wrapper for the .stream method of KafkaStreams. Allows simple usage of a topic with type inference + * Wrapper for the .stream method of KafkaStreams. + * Allows simple usage of a topic with type inference * * @param sb The streamsBuilder * @return a Kstream from the given topic @@ -116,7 +121,9 @@ public KStream stream(StreamsBuilder sb) { * @return a Ktable from the given topic */ public KTable table(StreamsBuilder sb, String storeName) { - return sb.table(this.toString(), Consumed.with(keySerde, valueSerde), Materialized.>as(storeName).withKeySerde(keySerde).withValueSerde(valueSerde)); + return sb.table(this.toString(), Consumed.with(keySerde, valueSerde), + Materialized.>as(storeName).withKeySerde(keySerde) + .withValueSerde(valueSerde)); } /** @@ -127,7 +134,9 @@ public KTable table(StreamsBuilder sb, String storeName) { * @return a GlobalKtable from the given topic */ public GlobalKTable globalTable(StreamsBuilder sb, String storeName) { - return sb.globalTable(this.toString(), Consumed.with(keySerde, valueSerde), Materialized.>as(storeName).withKeySerde(keySerde).withValueSerde(valueSerde)); + return sb.globalTable(this.toString(), Consumed.with(keySerde, valueSerde), + Materialized.>as(storeName).withKeySerde(keySerde) + .withValueSerde(valueSerde)); } /** diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/WindowStateStoreUtils.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/WindowStateStoreUtils.java index 9ae54646..2cc7192e 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/WindowStateStoreUtils.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/WindowStateStoreUtils.java @@ -1,18 +1,19 @@ package com.michelin.kstreamplify.utils; -import org.apache.kafka.streams.state.WindowStore; - import java.time.Duration; import java.time.Instant; +import org.apache.kafka.streams.state.WindowStore; /** - * The window state store utils + * The window state store utils. */ public final class WindowStateStoreUtils { - private WindowStateStoreUtils() { } + private WindowStateStoreUtils() { + } /** - * Put the key/value into the state store + * Put the key/value into the state store. + * * @param stateStore The stateStore * @param key The key * @param value The value @@ -24,7 +25,8 @@ public static void put(WindowStore stateStore, K key, V value) { } /** - * Get the value by the key from the state store + * Get the value by the key from the state store. + * * @param stateStore The stateStore * @param key The key * @param retentionDays The delay of retention @@ -33,7 +35,9 @@ public static void put(WindowStore stateStore, K key, V value) { * @return The last value inserted in the state store for the key */ public static V get(WindowStore stateStore, K key, int retentionDays) { - var resultIterator = stateStore.backwardFetch(key, Instant.now().minus(Duration.ofDays(retentionDays)), Instant.now()); + var resultIterator = + stateStore.backwardFetch(key, Instant.now().minus(Duration.ofDays(retentionDays)), + Instant.now()); if (resultIterator != null && resultIterator.hasNext()) { return resultIterator.next().value; } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/converter/AvroToJsonConverterTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/converter/AvroToJsonConverterTest.java index d34b894f..6dc908ea 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/converter/AvroToJsonConverterTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/converter/AvroToJsonConverterTest.java @@ -1,20 +1,19 @@ package com.michelin.kstreamplify.converter; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.google.gson.Gson; import com.google.gson.JsonObject; import com.michelin.kstreamplify.avro.KafkaTestAvro; import com.michelin.kstreamplify.avro.MapElement; import com.michelin.kstreamplify.avro.SubKafkaTestAvro; import com.michelin.kstreamplify.avro.SubSubKafkaTestAvro; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.Test; - import java.math.BigDecimal; import java.time.Instant; import java.util.List; import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; @Slf4j class AvroToJsonConverterTest { @@ -26,14 +25,18 @@ void shouldConvertAvroToJson() { var gson = new Gson(); var jsonObject = gson.fromJson(jsonString, JsonObject.class); - + assertEquals("false", jsonObject.get("booleanField").getAsString()); assertEquals("1970-01-01T00:00:00.001Z", jsonObject.get("dateField").getAsString()); assertEquals("10", jsonObject.get("quantityField").getAsString()); assertEquals("test", jsonObject.get("stringField").getAsString()); - assertEquals("1970-01-01T00:00:00.002Z", jsonObject.getAsJsonArray("split").get(0).getAsJsonObject().getAsJsonArray("subSplit").get(0).getAsJsonObject().get("subSubDateField").getAsString()); - assertEquals("1970-01-01T00:00:00.003Z", jsonObject.getAsJsonObject("members").getAsJsonObject("key1").get("mapDateField").getAsString()); + assertEquals("1970-01-01T00:00:00.002Z", + jsonObject.getAsJsonArray("split").get(0).getAsJsonObject().getAsJsonArray("subSplit") + .get(0).getAsJsonObject().get("subSubDateField").getAsString()); + assertEquals("1970-01-01T00:00:00.003Z", + jsonObject.getAsJsonObject("members").getAsJsonObject("key1").get("mapDateField") + .getAsString()); assertEquals("val1", jsonObject.getAsJsonObject("membersString").get("key1").getAsString()); assertEquals("val1", jsonObject.getAsJsonArray("listString").get(0).getAsString()); assertEquals("val2", jsonObject.getAsJsonArray("listString").get(1).getAsString()); @@ -43,25 +46,25 @@ void shouldConvertAvroToJson() { private KafkaTestAvro getKafkaTest() { return KafkaTestAvro.newBuilder() - .setStringField("test") - .setDateField(Instant.ofEpochMilli(1)) - .setQuantityField(BigDecimal.TEN) - .setMembers(Map.of("key1", MapElement.newBuilder() - .setMapDateField(Instant.ofEpochMilli(3)) - .setMapQuantityField(BigDecimal.ONE) - .build())) - .setMembersString(Map.of("key1","val1")) - .setListString(List.of("val1","val2")) - .setSplit(List.of( - SubKafkaTestAvro.newBuilder() - .setSubField("subTest") - .setSubSplit(List.of( - SubSubKafkaTestAvro.newBuilder() - .setSubSubField("subSubTest") - .setSubSubDateField(Instant.ofEpochMilli(2)) - .setSubSubIntField(8) - .build())) - .build())) - .build(); + .setStringField("test") + .setDateField(Instant.ofEpochMilli(1)) + .setQuantityField(BigDecimal.TEN) + .setMembers(Map.of("key1", MapElement.newBuilder() + .setMapDateField(Instant.ofEpochMilli(3)) + .setMapQuantityField(BigDecimal.ONE) + .build())) + .setMembersString(Map.of("key1", "val1")) + .setListString(List.of("val1", "val2")) + .setSplit(List.of( + SubKafkaTestAvro.newBuilder() + .setSubField("subTest") + .setSubSplit(List.of( + SubSubKafkaTestAvro.newBuilder() + .setSubSubField("subSubTest") + .setSubSubDateField(Instant.ofEpochMilli(2)) + .setSubSubIntField(8) + .build())) + .build())) + .build(); } } diff --git a/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/initializer/SpringKafkaStreamsInitializer.java b/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/initializer/SpringKafkaStreamsInitializer.java index aa78a951..b3cb5367 100644 --- a/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/initializer/SpringKafkaStreamsInitializer.java +++ b/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/initializer/SpringKafkaStreamsInitializer.java @@ -13,41 +13,41 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; -import java.io.IOException; - /** - * The Kafka Streams initializer class + * The Kafka Streams initializer class. */ @Slf4j @Component @ConditionalOnBean(KafkaStreamsStarter.class) -public class SpringKafkaStreamsInitializer extends KafkaStreamsInitializer implements ApplicationRunner { +public class SpringKafkaStreamsInitializer extends KafkaStreamsInitializer + implements ApplicationRunner { /** - * The application context + * The application context. */ @Autowired private ConfigurableApplicationContext applicationContext; /** - * The server port + * The server port. */ @Value("${server.port:8080}") private int springBootServerPort; /** - * The Kafka properties + * The Kafka properties. */ @Autowired private KafkaProperties springBootKafkaProperties; /** - * The Kafka Streams starter + * The Kafka Streams starter. */ @Autowired private KafkaStreamsStarter kafkaStreamsStarter; /** - * Run method + * Run method. + * * @param args the program arguments */ @Override @@ -56,7 +56,7 @@ public void run(ApplicationArguments args) { } /** - * ${@inheritDoc} + * {@inheritDoc} */ @Override protected void initHttpServer() { @@ -64,7 +64,7 @@ protected void initHttpServer() { } /** - * ${@inheritDoc} + * {@inheritDoc} */ @Override protected void initProperties() { @@ -74,16 +74,17 @@ protected void initProperties() { } /** - * ${@inheritDoc} + * {@inheritDoc} */ @Override - protected StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse onStreamsUncaughtException(Throwable exception) { + protected StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse onStreamsUncaughtException( + Throwable exception) { closeApplicationContext(); return super.onStreamsUncaughtException(exception); } /** - * ${@inheritDoc} + * {@inheritDoc} */ @Override protected void onStateChange(KafkaStreams.State newState, KafkaStreams.State oldState) { @@ -93,7 +94,7 @@ protected void onStateChange(KafkaStreams.State newState, KafkaStreams.State old } /** - * Close the application context + * Close the application context. */ private void closeApplicationContext() { if (applicationContext != null) { diff --git a/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/properties/KafkaProperties.java b/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/properties/KafkaProperties.java index f0544a6d..04057298 100644 --- a/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/properties/KafkaProperties.java +++ b/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/properties/KafkaProperties.java @@ -1,16 +1,15 @@ package com.michelin.kstreamplify.properties; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - /** - * The Kafka properties class + * The Kafka properties class. */ @Getter @Setter @@ -18,12 +17,13 @@ @ConfigurationProperties(prefix = "kafka") public class KafkaProperties { /** - * The Kafka properties + * The Kafka properties. */ private final Map properties = new HashMap<>(); /** - * Return the Kafka properties as {@link java.util.Properties} + * Return the Kafka properties as {@link java.util.Properties}. + * * @return The Kafka properties */ public Properties asProperties() { diff --git a/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/rest/SpringProbeController.java b/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/rest/SpringProbeController.java index 95d4de0d..4bdd79d2 100644 --- a/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/rest/SpringProbeController.java +++ b/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/rest/SpringProbeController.java @@ -11,19 +11,20 @@ import org.springframework.web.bind.annotation.RestController; /** - * Spring Boot probe controller + * Spring Boot probe controller. */ @RestController @ConditionalOnBean(KafkaStreamsStarter.class) public class SpringProbeController { /** - * The Kafka Streams initializer + * The Kafka Streams initializer. */ @Autowired private SpringKafkaStreamsInitializer kafkaStreamsInitializer; /** - * Readiness Kubernetes probe endpoint + * Readiness Kubernetes probe endpoint. + * * @return An HTTP response based on the Kafka Streams state */ @GetMapping("/${readiness_path:ready}") @@ -32,7 +33,8 @@ public ResponseEntity readinessProbe() { } /** - * Liveness Kubernetes probe endpoint + * Liveness Kubernetes probe endpoint. + * * @return An HTTP response based on the Kafka Streams state */ @GetMapping("/${liveness_path:liveness}") @@ -41,7 +43,8 @@ public ResponseEntity livenessProbe() { } /** - * Get the Kafka Streams topology + * Get the Kafka Streams topology. + * * @return The Kafka Streams topology */ @GetMapping("/${expose_topology_path:topology}") @@ -50,11 +53,13 @@ public ResponseEntity exposeTopology() { } /** - * Convert the probe service response into an HTTP response entity + * Convert the probe service response into an HTTP response entity. + * * @param serviceResponse The probe service response * @return An HTTP response */ - private static ResponseEntity convertToResponseEntity(RestServiceResponse serviceResponse) { + private static ResponseEntity convertToResponseEntity( + RestServiceResponse serviceResponse) { return ResponseEntity.status(serviceResponse.getStatus()).body(serviceResponse.getBody()); } } diff --git a/pom.xml b/pom.xml index 41fe3e30..6d6a768a 100644 --- a/pom.xml +++ b/pom.xml @@ -1,313 +1,344 @@ - 4.0.0 - pom + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 + pom - com.michelin - kstreamplify - 0.1.1-SNAPSHOT - kstreamplify - Kstreamplify is a Java library that brings new features on top of Kafka Streams. - https://github.com/michelin/kstreamplify + com.michelin + kstreamplify + 0.1.1-SNAPSHOT + kstreamplify + Kstreamplify is a Java library that brings new features on top of Kafka Streams. + https://github.com/michelin/kstreamplify - - - The Apache License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0.txt - - + + + The Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + - - - ossrh - https://s01.oss.sonatype.org/content/repositories/snapshots - - - ossrh - https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/ - - + + + ossrh + https://s01.oss.sonatype.org/content/repositories/snapshots + + + ossrh + https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/ + + - - - sebastienviale - Sebastien Viale - dif-hap-dev@michelin.com - https://github.com/sebastienviale - Michelin - - Developer - - - - clvacher - Clement Vacher - dif-hap-dev@michelin.com - https://github.com/clvacher - CGI - - Developer - - - - alexbosch3000 - Alexandre Bosch - dif-hap-dev@michelin.com - https://github.com/alexbosch3000 - Michelin - - Developer - - - - clvacher - Clement Vacher - dif-hap-dev@michelin.com - https://github.com/clvacher - CGI - - Developer - - - - loicgreffier - Loïc Greffier - dif-hap-dev@michelin.com - https://github.com/loicgreffier - Michelin - - Developer - - - - adriencalime - Adrien Calime - dif-hap-dev@michelin.com - https://github.com/adriencalime - Michelin - - Developer - - - + + + sebastienviale + Sebastien Viale + dif-hap-dev@michelin.com + https://github.com/sebastienviale + Michelin + + Developer + + + + clvacher + Clement Vacher + dif-hap-dev@michelin.com + https://github.com/clvacher + CGI + + Developer + + + + alexbosch3000 + Alexandre Bosch + dif-hap-dev@michelin.com + https://github.com/alexbosch3000 + Michelin + + Developer + + + + clvacher + Clement Vacher + dif-hap-dev@michelin.com + https://github.com/clvacher + CGI + + Developer + + + + loicgreffier + Loïc Greffier + dif-hap-dev@michelin.com + https://github.com/loicgreffier + Michelin + + Developer + + + + adriencalime + Adrien Calime + dif-hap-dev@michelin.com + https://github.com/adriencalime + Michelin + + Developer + + + - - - confluent - https://packages.confluent.io/maven/ - - + + + confluent + https://packages.confluent.io/maven/ + + - - scm:git:https://github.com/michelin/kstreamplify.git - scm:git:https://github.com/michelin/kstreamplify.git - scm:git:https://github.com/michelin/kstreamplify.git - + + scm:git:https://github.com/michelin/kstreamplify.git + scm:git:https://github.com/michelin/kstreamplify.git + scm:git:https://github.com/michelin/kstreamplify.git + - + kstreamplify-core - kstreamplify-spring-boot - kstreamplify-core-test - + kstreamplify-spring-boot + kstreamplify-core-test + - - 1.11.2 - 2.13.0 - 3.13.0 - 2.10.1 - 17 - 5.10.0 - 3.4.0 - 7.5.0 - 1.18.28 - 17 - 17 - 3.1.3 - michelin - michelin_kstreamplify - ${project.artifactId} - https://sonarcloud.io - + + 1.11.2 + 2.13.0 + 3.13.0 + 2.10.1 + 17 + 5.10.0 + 3.4.0 + 7.5.0 + 1.18.28 + 17 + 17 + 3.1.3 + michelin + michelin_kstreamplify + ${project.artifactId} + https://sonarcloud.io + - - - org.apache.kafka - kafka-streams - ${kafka.version} - + + + org.apache.kafka + kafka-streams + ${kafka.version} + - - org.apache.kafka - kafka-streams-test-utils - ${kafka.version} - + + org.apache.kafka + kafka-streams-test-utils + ${kafka.version} + - - io.confluent - kafka-streams-avro-serde - ${kafka-streams-avro-serde.version} - + + io.confluent + kafka-streams-avro-serde + ${kafka-streams-avro-serde.version} + - - org.projectlombok - lombok - ${lombok.version} - provided - + + org.projectlombok + lombok + ${lombok.version} + provided + - - commons-io - commons-io - ${commons-io.version} - + + commons-io + commons-io + ${commons-io.version} + - - org.junit.jupiter - junit-jupiter - ${junit-jupiter.version} - + + org.junit.jupiter + junit-jupiter + ${junit-jupiter.version} + - - com.google.code.gson - gson - ${gson.version} - + + com.google.code.gson + gson + ${gson.version} + - - org.apache.commons - commons-lang3 - ${commons-lang3.version} - + + org.apache.commons + commons-lang3 + ${commons-lang3.version} + - + - - - - org.apache.maven.plugins - maven-source-plugin - 3.3.0 - - - attach-sources - verify - - jar-no-fork - - - - + + + + org.apache.maven.plugins + maven-source-plugin + 3.3.0 + + + attach-sources + verify + + jar-no-fork + + + + - - org.apache.maven.plugins - maven-javadoc-plugin - 3.5.0 - - - attach-javadocs - verify - - jar - - - - + + org.apache.maven.plugins + maven-javadoc-plugin + 3.5.0 + + + attach-javadocs + verify + + jar + + + + - - org.apache.maven.plugins - maven-surefire-plugin - 3.1.2 - - - org.junit.jupiter - junit-jupiter-engine - ${junit-jupiter.version} - - - + + org.apache.maven.plugins + maven-surefire-plugin + 3.1.2 + + + org.junit.jupiter + junit-jupiter-engine + ${junit-jupiter.version} + + + - - org.jacoco - jacoco-maven-plugin - 0.8.10 - - - jacoco-initialize - - prepare-agent - - - - report - prepare-package - - report - - - - post-unit-test - test - - report - - - - target/jacoco.exec - - target/jacoco-ut - - - - - + + org.jacoco + jacoco-maven-plugin + 0.8.10 + + + jacoco-initialize + + prepare-agent + + + + report + prepare-package + + report + + + + post-unit-test + test + + report + + + + target/jacoco.exec + + target/jacoco-ut + + + + - - - - org.apache.avro - avro-maven-plugin - ${avro.version} - - - generate-sources - - schema - - - String - true - ${project.basedir}/src/main/avro/ - ${project.basedir}/target/generated-sources - - - - - - - + + org.apache.maven.plugins + maven-checkstyle-plugin + 3.3.0 + + + com.puppycrawl.tools + checkstyle + 10.12.3 + + + + .checkstyle/checkstyle.xml + + ${project.build.sourceDirectory} + ${project.build.testSourceDirectory} + + info + true + + + + check-style + verify + + check + + + + + - - - sign - - - - org.apache.maven.plugins - maven-gpg-plugin - 3.1.0 - - - sign-artifacts - verify - - sign - - - - - - - - + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + String + true + ${project.basedir}/src/main/avro/ + ${project.basedir}/target/generated-sources + + + + + + + + + + + sign + + + + org.apache.maven.plugins + maven-gpg-plugin + 3.1.0 + + + sign-artifacts + verify + + sign + + + + + + + +