diff --git a/.checkstyle/checkstyle.xml b/.checkstyle/checkstyle.xml
new file mode 100644
index 00000000..32c0f31e
--- /dev/null
+++ b/.checkstyle/checkstyle.xml
@@ -0,0 +1,382 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/.github/workflows/on_pull_request.yml b/.github/workflows/on_pull_request.yml
index 9cd5c772..32a5d5df 100644
--- a/.github/workflows/on_pull_request.yml
+++ b/.github/workflows/on_pull_request.yml
@@ -20,6 +20,9 @@ jobs:
distribution: 'temurin'
cache: maven
+ - name: Check Style
+ run: mvn checkstyle:check
+
- name: Build
run: mvn clean compile
diff --git a/.github/workflows/on_push_main.yml b/.github/workflows/on_push_main.yml
index accf452e..885d56a8 100644
--- a/.github/workflows/on_push_main.yml
+++ b/.github/workflows/on_push_main.yml
@@ -25,6 +25,9 @@ jobs:
gpg-private-key: ${{ secrets.GPG_PRIVATE_KEY }}
gpg-passphrase: MAVEN_GPG_PASSPHRASE
+ - name: Check Style
+ run: mvn checkstyle:check
+
- name: Build
id: build
run: |
diff --git a/.readme/contributing/check_style.png b/.readme/contributing/check_style.png
new file mode 100644
index 00000000..9db02b4c
Binary files /dev/null and b/.readme/contributing/check_style.png differ
diff --git a/.readme/contributing/reformat_code.png b/.readme/contributing/reformat_code.png
new file mode 100644
index 00000000..31bf239b
Binary files /dev/null and b/.readme/contributing/reformat_code.png differ
diff --git a/.readme/contributing/save_actions.png b/.readme/contributing/save_actions.png
new file mode 100644
index 00000000..cfbb7a63
Binary files /dev/null and b/.readme/contributing/save_actions.png differ
diff --git a/.readme/contributing/scan.png b/.readme/contributing/scan.png
new file mode 100644
index 00000000..c099146d
Binary files /dev/null and b/.readme/contributing/scan.png differ
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 96c59105..b997a970 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -35,7 +35,32 @@ In general, we follow the ["fork-and-pull" Git workflow](https://github.com/susa
- Push changes to your fork
- Open a PR in our repository targeting master and follow the PR template so that we can efficiently review the changes.
-## Styleguides
+## Style Guide
+
+### Code Style
+
+We maintain a consistent code style using Checkstyle.
+
+#### IntelliJ
+
+To ensure code style consistency in IntelliJ, follow these steps:
+
+1. Install the [CheckStyle-IDEA plugin](https://plugins.jetbrains.com/plugin/1065-checkstyle-idea).
+2. Create a new CheckStyle configuration for Kstreamplify based on the code style configuration located in the `.checkstyle` folder. Configure it as follows:
+
+![check_style.png](.readme%2Fcontributing%2Fcheck_style.png)
+
+3. Enable the "Reformat code" and "Optimize imports" options in the save actions:
+
+![save_actions.png](.readme%2Fcontributing%2Fsave_actions.png)
+
+4. Reformat your code with the Checkstyle configuration:
+
+![reformat_code.png](.readme%2Fcontributing%2Freformat_code.png)
+
+5. Before committing your changes, ensure your contribution doesn't introduce any problems by running a scan:
+
+![scan.png](.readme%2Fcontributing%2Fscan.png)
### Git Commit Messages
diff --git a/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java b/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java
index 1496632f..51ab78d2 100644
--- a/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java
+++ b/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java
@@ -7,6 +7,12 @@
import com.michelin.kstreamplify.utils.TopicWithSerde;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@@ -16,32 +22,25 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.time.Instant;
-import java.util.Collections;
-import java.util.Properties;
-
/**
- *
The main test class to extend to execute unit tests on topology
+ * The main test class to extend to execute unit tests on topology
.
* It provides a {@link TopologyTestDriver} and a {@link TestOutputTopic} for the DLQ
*/
public abstract class KafkaStreamsStarterTest {
private static final String STATE_DIR = "/tmp/kafka-streams/";
/**
- * The topology test driver
+ * The topology test driver.
*/
protected TopologyTestDriver testDriver;
/**
- * The dlq topic, initialized in {@link #generalSetUp()}
+ * The dlq topic, initialized in {@link #generalSetUp()}.
*/
protected TestOutputTopic dlqTopic;
/**
- * Set up topology test driver
+ * Set up topology test driver.
*/
@BeforeEach
void generalSetUp() {
@@ -52,7 +51,8 @@ void generalSetUp() {
KafkaStreamsExecutionContext.registerProperties(properties);
KafkaStreamsExecutionContext.setSerdesConfig(Collections
- .singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://" + getClass().getName()));
+ .singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
+ "mock://" + getClass().getName()));
var starter = getKafkaStreamsStarter();
@@ -61,20 +61,22 @@ void generalSetUp() {
StreamsBuilder streamsBuilder = new StreamsBuilder();
starter.topology(streamsBuilder);
- testDriver = new TopologyTestDriver(streamsBuilder.build(), properties, getInitialWallClockTime());
+ testDriver =
+ new TopologyTestDriver(streamsBuilder.build(), properties, getInitialWallClockTime());
- dlqTopic = testDriver.createOutputTopic(KafkaStreamsExecutionContext.getDlqTopicName(), new StringDeserializer(), SerdesUtils.getSerdesForValue().deserializer());
+ dlqTopic = testDriver.createOutputTopic(KafkaStreamsExecutionContext.getDlqTopicName(),
+ new StringDeserializer(), SerdesUtils.getSerdesForValue().deserializer());
}
/**
- * Method to override to provide the KafkaStreamsStarter to test
+ * Method to override to provide the KafkaStreamsStarter to test.
*
* @return The KafkaStreamsStarter to test
*/
protected abstract KafkaStreamsStarter getKafkaStreamsStarter();
/**
- * Default base wall clock time for topology test driver
+ * Default base wall clock time for topology test driver.
*
* @return The default wall clock time as instant
*/
@@ -83,7 +85,7 @@ protected Instant getInitialWallClockTime() {
}
/**
- * Method to close everything properly at the end of the test
+ * Method to close everything properly at the end of the test.
*/
@AfterEach
void generalTearDown() throws IOException {
@@ -93,26 +95,31 @@ void generalTearDown() throws IOException {
}
/**
- * Creates an input test topic on the testDriver using the provided topicWithSerde
+ * Creates an input test topic on the testDriver using the provided topicWithSerde.
*
* @param topicWithSerde The topic with serde used to crete the test topic
* @param The serializable type of the key
* @param The serializable type of the value
* @return The corresponding TestInputTopic
*/
- protected TestInputTopic createInputTestTopic(TopicWithSerde topicWithSerde) {
- return this.testDriver.createInputTopic(topicWithSerde.getUnPrefixedName(), topicWithSerde.getKeySerde().serializer(), topicWithSerde.getValueSerde().serializer());
+ protected TestInputTopic createInputTestTopic(
+ TopicWithSerde topicWithSerde) {
+ return this.testDriver.createInputTopic(topicWithSerde.getUnPrefixedName(),
+ topicWithSerde.getKeySerde().serializer(), topicWithSerde.getValueSerde().serializer());
}
/**
- * Creates an output test topic on the testDriver using the provided topicWithSerde
+ * Creates an output test topic on the testDriver using the provided topicWithSerde.
*
* @param topicWithSerde The topic with serde used to crete the test topic
* @param The serializable type of the key
* @param The serializable type of the value
* @return The corresponding TestOutputTopic
*/
- protected TestOutputTopic createOutputTestTopic(TopicWithSerde topicWithSerde) {
- return this.testDriver.createOutputTopic(topicWithSerde.getUnPrefixedName(), topicWithSerde.getKeySerde().deserializer(), topicWithSerde.getValueSerde().deserializer());
+ protected TestOutputTopic createOutputTestTopic(
+ TopicWithSerde topicWithSerde) {
+ return this.testDriver.createOutputTopic(topicWithSerde.getUnPrefixedName(),
+ topicWithSerde.getKeySerde().deserializer(),
+ topicWithSerde.getValueSerde().deserializer());
}
}
diff --git a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java
index db52f428..c30c20b1 100644
--- a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java
+++ b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java
@@ -1,10 +1,13 @@
package com.michelin.kstreamplify;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.error.ProcessingResult;
import com.michelin.kstreamplify.error.TopologyErrorHandler;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import com.michelin.kstreamplify.utils.SerdesUtils;
+import java.util.List;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -17,20 +20,17 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
class TopologyErrorHandlerTest extends KafkaStreamsStarterTest {
- private final String AVRO_TOPIC = "avroTopic";
+ private static final String AVRO_TOPIC = "avroTopic";
+ private static final String STRING_TOPIC = "stringTopic";
+ private static final String OUTPUT_AVRO_TOPIC = "outputAvroTopic";
+ private static final String OUTPUT_STRING_TOPIC = "outputStringTopic";
+ private static final String DLQ_TOPIC = "dlqTopic";
+
private TestInputTopic avroInputTopic;
- private final String STRING_TOPIC = "stringTopic";
private TestInputTopic stringInputTopic;
- private final String OUTPUT_AVRO_TOPIC = "outputAvroTopic";
private TestOutputTopic avroOutputTopic;
- private final String OUTPUT_STRING_TOPIC = "outputStringTopic";
private TestOutputTopic stringOutputTopic;
- private final String DLQ_TOPIC = "dlqTopic";
private TestOutputTopic dlqTopic;
@Override
@@ -44,33 +44,44 @@ public String dlqTopic() {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream> stringStream = streamsBuilder
- .stream(STRING_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
- .mapValues(value -> "error".equals(value) ?
- ProcessingResult.fail(new NullPointerException(), value) : ProcessingResult.success(value));
+ .stream(STRING_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
+ .mapValues(value -> "error".equals(value)
+ ? ProcessingResult.fail(new NullPointerException(), value) :
+ ProcessingResult.success(value));
TopologyErrorHandler.catchErrors(stringStream)
- .to(OUTPUT_STRING_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
+ .to(OUTPUT_STRING_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
- KStream> avroStream = streamsBuilder
- .stream(AVRO_TOPIC, Consumed.with(Serdes.String(), SerdesUtils.getSerdesForValue()))
- .mapValues(value -> value == null ?
- ProcessingResult.fail(new NullPointerException(), null) : ProcessingResult.success(value));
+ KStream> avroStream =
+ streamsBuilder
+ .stream(AVRO_TOPIC, Consumed.with(Serdes.String(),
+ SerdesUtils.getSerdesForValue()))
+ .mapValues(value -> value == null
+ ? ProcessingResult.fail(new NullPointerException(), null) :
+ ProcessingResult.success(value));
TopologyErrorHandler.catchErrors(avroStream)
- .to(OUTPUT_AVRO_TOPIC, Produced.with(Serdes.String(), SerdesUtils.getSerdesForValue()));
+ .to(OUTPUT_AVRO_TOPIC,
+ Produced.with(Serdes.String(), SerdesUtils.getSerdesForValue()));
}
};
}
@BeforeEach
void setUp() {
- stringInputTopic = testDriver.createInputTopic(STRING_TOPIC, new StringSerializer(), new StringSerializer());
- avroInputTopic = testDriver.createInputTopic(AVRO_TOPIC, new StringSerializer(), SerdesUtils.getSerdesForValue().serializer());
-
- stringOutputTopic = testDriver.createOutputTopic(OUTPUT_STRING_TOPIC, new StringDeserializer(), new StringDeserializer());
- avroOutputTopic = testDriver.createOutputTopic(OUTPUT_AVRO_TOPIC, new StringDeserializer(), SerdesUtils.getSerdesForValue().deserializer());
-
- dlqTopic = testDriver.createOutputTopic(DLQ_TOPIC, new StringDeserializer(), SerdesUtils.getSerdesForValue().deserializer());
+ stringInputTopic = testDriver.createInputTopic(STRING_TOPIC, new StringSerializer(),
+ new StringSerializer());
+ avroInputTopic = testDriver.createInputTopic(AVRO_TOPIC, new StringSerializer(),
+ SerdesUtils.getSerdesForValue().serializer());
+
+ stringOutputTopic =
+ testDriver.createOutputTopic(OUTPUT_STRING_TOPIC, new StringDeserializer(),
+ new StringDeserializer());
+ avroOutputTopic = testDriver.createOutputTopic(OUTPUT_AVRO_TOPIC, new StringDeserializer(),
+ SerdesUtils.getSerdesForValue().deserializer());
+
+ dlqTopic = testDriver.createOutputTopic(DLQ_TOPIC, new StringDeserializer(),
+ SerdesUtils.getSerdesForValue().deserializer());
}
@Test
@@ -86,7 +97,7 @@ void shouldContinueWhenProcessingValueIsValid() {
}
@Test
- void shouldSendExceptionToDLQWhenProcessingValueIsInvalid() {
+ void shouldSendExceptionToDlqWhenProcessingValueIsInvalid() {
stringInputTopic.pipeInput("key", "error");
var resultDlq = dlqTopic.readValuesToList();
@@ -100,13 +111,13 @@ void shouldSendExceptionToDLQWhenProcessingValueIsInvalid() {
void shouldContinueWhenProcessingValueIsValidAvro() {
KafkaError avroModel = KafkaError.newBuilder()
- .setTopic("topic")
- .setStack("stack")
- .setPartition(0)
- .setOffset(0)
- .setCause("cause")
- .setValue("value")
- .build();
+ .setTopic("topic")
+ .setStack("stack")
+ .setPartition(0)
+ .setOffset(0)
+ .setCause("cause")
+ .setValue("value")
+ .build();
avroInputTopic.pipeInput("key", avroModel);
diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/HttpServerConstants.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/HttpServerConstants.java
index ebd625df..9ee74b92 100644
--- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/HttpServerConstants.java
+++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/HttpServerConstants.java
@@ -4,37 +4,37 @@
import lombok.NoArgsConstructor;
/**
- * HTTP server constants
+ * HTTP server constants.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class HttpServerConstants {
/**
- * Readiness probe path property name
+ * Readiness probe path property name.
*/
public static final String READINESS_PROPERTY = "readiness_path";
/**
- * Liveness probe path property name
+ * Liveness probe path property name.
*/
public static final String LIVENESS_PROPERTY = "liveness_path";
/**
- * Topology property name
+ * Topology property name.
*/
public static final String TOPOLOGY_PROPERTY = "expose_topology_path";
/**
- * Readiness default path
+ * Readiness default path.
*/
public static final String READINESS_DEFAULT_PATH = "ready";
/**
- * Liveness default path
+ * Liveness default path.
*/
public static final String LIVENESS_DEFAULT_PATH = "liveness";
/**
- * Topology default path
+ * Topology default path.
*/
public static final String TOPOLOGY_DEFAULT_PATH = "topology";
}
diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/InitializerConstants.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/InitializerConstants.java
index 67ed8daf..1d278b7c 100644
--- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/InitializerConstants.java
+++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/InitializerConstants.java
@@ -4,27 +4,27 @@
import lombok.NoArgsConstructor;
/**
- * Kafka Streams initialization constants
+ * Kafka Streams initialization constants.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class InitializerConstants {
/**
- * Server port property name
+ * Server port property name.
*/
public static final String SERVER_PORT_PROPERTY = "server.port";
/**
- * Default host
+ * Default host.
*/
public static final String LOCALHOST = "localhost";
/**
- * Name of the property containing of the name of the var env containing the IP
+ * Name of the property containing of the name of the var env containing the IP.
*/
public static final String IP_SYSTEM_VARIABLE_PROPERTY = "ip.env.var.name";
/**
- * Default var env name containing the IP
+ * Default var env name containing the IP.
*/
public static final String IP_SYSTEM_VARIABLE_DEFAULT = "MY_POD_IP";
}
diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/PropertyConstants.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/PropertyConstants.java
index 1a07ac13..bb462dcd 100644
--- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/PropertyConstants.java
+++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/constants/PropertyConstants.java
@@ -4,42 +4,42 @@
import lombok.NoArgsConstructor;
/**
- * Property constants
+ * Property constants.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PropertyConstants {
/**
- * Property separator
+ * Property separator.
*/
public static final String PROPERTY_SEPARATOR = ".";
/**
- * Kafka properties prefix
+ * Kafka properties prefix.
*/
public static final String KAFKA_PROPERTIES_PREFIX = "kafka.properties";
/**
- * Default property file name
+ * Default property file name.
*/
public static final String DEFAULT_PROPERTY_FILE = "application.yml";
/**
- * Prefix property name
+ * Prefix property name.
*/
public static final String PREFIX_PROPERTY_NAME = "prefix";
/**
- * Topic property name
+ * Topic property name.
*/
public static final String TOPIC_PROPERTY_NAME = "topic";
/**
- * Remap property name
+ * Remap property name.
*/
public static final String REMAP_PROPERTY_NAME = "remap";
/**
- * Default prefix property name
+ * Default prefix property name.
*/
public static final String SELF = "self";
}
diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java
index 94ca389a..b9a5268c 100644
--- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java
+++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java
@@ -1,39 +1,39 @@
package com.michelin.kstreamplify.context;
-import com.michelin.kstreamplify.constants.PropertyConstants;
+import static com.michelin.kstreamplify.constants.PropertyConstants.PREFIX_PROPERTY_NAME;
+import static com.michelin.kstreamplify.constants.PropertyConstants.PROPERTY_SEPARATOR;
+import static com.michelin.kstreamplify.constants.PropertyConstants.SELF;
+
+import java.util.Map;
+import java.util.Properties;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.StreamsConfig;
-import java.util.Map;
-import java.util.Properties;
-
-import static com.michelin.kstreamplify.constants.PropertyConstants.*;
-
/**
- * The class to represent the context of the KStream
+ * The class to represent the context of the KStream.
*/
@Slf4j
public class KafkaStreamsExecutionContext {
/**
- * the DLQ topic name
+ * The DLQ topic name.
*/
@Getter
@Setter
private static String dlqTopicName;
/**
- * the Serdes config Map
+ * The Serdes config Map.
*/
@Getter
@Setter
private static Map serdesConfig;
/**
- * the properties of the stream execution context
+ * The properties of the stream execution context.
*/
@Getter
private static Properties properties;
@@ -47,7 +47,6 @@ public class KafkaStreamsExecutionContext {
* prefix:
* self: "myNamespacePrefix."
* }
- *
*/
@Getter
private static String prefix;
@@ -56,7 +55,7 @@ private KafkaStreamsExecutionContext() {
}
/**
- * Register KStream properties
+ * Register KStream properties.
*
* @param properties The Kafka Streams properties
*/
@@ -66,14 +65,17 @@ public static void registerProperties(Properties properties) {
}
prefix = properties.getProperty(PREFIX_PROPERTY_NAME + PROPERTY_SEPARATOR + SELF, "");
- if (StringUtils.isNotBlank(prefix) && properties.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) {
+ if (StringUtils.isNotBlank(prefix)
+ && properties.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) {
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
- prefix.concat(properties.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)));
+ prefix.concat(properties.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)));
}
KafkaStreamsExecutionContext.properties = properties;
StringBuilder stringBuilderProperties = new StringBuilder("Kafka Stream properties:\n");
- properties.forEach((key, value) -> stringBuilderProperties.append("\t").append(key).append(" = ").append(value).append("\n"));
+ properties.forEach(
+ (key, value) -> stringBuilderProperties.append("\t").append(key).append(" = ")
+ .append(value).append("\n"));
log.info(stringBuilderProperties.toString());
}
}
diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/AvroToJsonConverter.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/AvroToJsonConverter.java
index e1ef992c..d74acf2a 100644
--- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/AvroToJsonConverter.java
+++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/AvroToJsonConverter.java
@@ -19,27 +19,28 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.util.Utf8;
-
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
/**
- * The class to convert Avro to Json
+ * The class to convert Avro to Json.
*/
public class AvroToJsonConverter {
- private AvroToJsonConverter() { }
+ private AvroToJsonConverter() {
+ }
private static final Gson gson = new GsonBuilder()
- .setPrettyPrinting()
- .create();
+ .setPrettyPrinting()
+ .create();
/**
- * Convert the record from avro format to json format
+ * Convert the record from avro format to json format.
+ *
* @param inputRecord the record in avro format
* @return the record in json format
*/
@@ -48,7 +49,8 @@ public static String convertRecord(GenericRecord inputRecord) {
}
/**
- * convert avro to a map for json format
+ * Convert avro to a map for json format.
+ *
* @param inputRecord record in avro
* @return map for json format
*/
@@ -64,15 +66,15 @@ private static Map recordAsMap(GenericRecord inputRecord) {
if (recordValue instanceof List> recordValueAsList) {
recordValue = recordValueAsList
- .stream()
- .map(value -> {
- if (value instanceof GenericRecord genericRecord) {
- return recordAsMap(genericRecord);
- } else {
- return value.toString();
- }
- })
- .toList();
+ .stream()
+ .map(value -> {
+ if (value instanceof GenericRecord genericRecord) {
+ return recordAsMap(genericRecord);
+ } else {
+ return value.toString();
+ }
+ })
+ .toList();
}
if (recordValue instanceof Map, ?> recordValueAsMap) {
diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/JsonToAvroConverter.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/JsonToAvroConverter.java
index b3ba9455..d6fc5bb2 100644
--- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/JsonToAvroConverter.java
+++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/JsonToAvroConverter.java
@@ -4,11 +4,6 @@
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
@@ -17,15 +12,20 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecordBase;
/**
- * The class to convert Json to Avro
+ * The class to convert Json to Avro.
*/
public class JsonToAvroConverter {
/**
- * convert a file in json to avro
- * @param file the file in json
+ * Convert a file in json to avro.
+ *
+ * @param file the file in json
* @param schema the avro schema to use
* @return the record in avro
*/
@@ -34,123 +34,149 @@ public static SpecificRecordBase jsonToAvro(String file, Schema schema) {
}
/**
- * convert json to avro
+ * Convert json to avro.
+ *
* @param jsonEvent the json record
- * @param schema the avro schema to use
+ * @param schema the avro schema to use
* @return the record in avro
*/
public static SpecificRecordBase jsonToAvro(JsonObject jsonEvent, Schema schema) {
try {
- SpecificRecordBase record = baseClass(schema.getNamespace(), schema.getName()).getDeclaredConstructor().newInstance();
- populateGenericRecordFromJson(jsonEvent, record);
- return record;
+ SpecificRecordBase message =
+ baseClass(schema.getNamespace(), schema.getName()).getDeclaredConstructor()
+ .newInstance();
+ populateGenericRecordFromJson(jsonEvent, message);
+ return message;
} catch (Exception e) {
return null;
}
}
/**
- * populate avro records from json
+ * Populate avro records from json.
+ *
* @param jsonObject json data to provide to the avro record
- * @param record the avro record to populate
+ * @param message the avro record to populate
*/
- private static void populateGenericRecordFromJson(JsonObject jsonObject, SpecificRecordBase record) {
- // Iterate over object attributes
- jsonObject.keySet().forEach(
- currentKey -> {
- try {
- var currentValue = jsonObject.get(currentKey);
+ private static void populateGenericRecordFromJson(JsonObject jsonObject,
+ SpecificRecordBase message) {
+ // Iterate over object attributes
+ jsonObject.keySet().forEach(
+ currentKey -> {
+ try {
+ var currentValue = jsonObject.get(currentKey);
- // If this is an object, add to prefix and call method again
- if (currentValue instanceof JsonObject currentValueJsonObject) {
- Schema currentSchema = record.getSchema().getField(currentKey).schema();
+ // If this is an object, add to prefix and call method again
+ if (currentValue instanceof JsonObject currentValueJsonObject) {
+ Schema currentSchema = message.getSchema().getField(currentKey).schema();
- // If the current value is a UNION
- if (currentSchema.getType().equals(Schema.Type.UNION)) {
- // Then research the first NOT NULL sub value
- Optional notNullSchema = currentSchema.getTypes().stream()
- .filter(s -> !s.getType().equals(Schema.Type.NULL))
- .findAny();
+ // If the current value is a UNION
+ if (currentSchema.getType().equals(Schema.Type.UNION)) {
+ // Then research the first NOT NULL sub value
+ Optional notNullSchema = currentSchema.getTypes().stream()
+ .filter(s -> !s.getType().equals(Schema.Type.NULL))
+ .findAny();
- if (notNullSchema.isPresent()) {
- currentSchema = notNullSchema.get();
- }
+ if (notNullSchema.isPresent()) {
+ currentSchema = notNullSchema.get();
}
+ }
- switch (currentSchema.getType()) {
- case RECORD -> {
- SpecificRecordBase currentRecord = baseClass(record.getSchema().getNamespace(), currentSchema.getName()).getDeclaredConstructor().newInstance();
- populateGenericRecordFromJson(currentValueJsonObject, currentRecord);
- record.put(currentKey, currentRecord);
- }
- case MAP -> {
- Map map = new HashMap<>();
- if (!currentSchema.getValueType().getType().equals(Schema.Type.RECORD)) {
- for (String key : currentValueJsonObject.keySet()) {
- Object value = populateFieldWithCorrespondingType(currentValueJsonObject.get(key), currentSchema.getValueType().getType());
- map.put(key, value);
- }
- } else {
- for (String key : currentValueJsonObject.keySet()) {
- SpecificRecordBase mapValueRecord = baseClass(record.getSchema().getNamespace(), currentSchema.getValueType().getName()).getDeclaredConstructor().newInstance();
- populateGenericRecordFromJson(currentValueJsonObject.get(key).getAsJsonObject(), mapValueRecord);
- map.put(key, mapValueRecord);
- }
+ switch (currentSchema.getType()) {
+ case RECORD -> {
+ SpecificRecordBase currentRecord =
+ baseClass(message.getSchema().getNamespace(),
+ currentSchema.getName()).getDeclaredConstructor()
+ .newInstance();
+ populateGenericRecordFromJson(currentValueJsonObject,
+ currentRecord);
+ message.put(currentKey, currentRecord);
+ }
+ case MAP -> {
+ Map map = new HashMap<>();
+ if (!currentSchema.getValueType().getType()
+ .equals(Schema.Type.RECORD)) {
+ for (String key : currentValueJsonObject.keySet()) {
+ Object value = populateFieldWithCorrespondingType(
+ currentValueJsonObject.get(key),
+ currentSchema.getValueType().getType());
+ map.put(key, value);
+ }
+ } else {
+ for (String key : currentValueJsonObject.keySet()) {
+ SpecificRecordBase mapValueRecord =
+ baseClass(message.getSchema().getNamespace(),
+ currentSchema.getValueType()
+ .getName()).getDeclaredConstructor()
+ .newInstance();
+ populateGenericRecordFromJson(
+ currentValueJsonObject.get(key).getAsJsonObject(),
+ mapValueRecord);
+ map.put(key, mapValueRecord);
}
- record.put(currentKey, map);
}
- default -> record.put(currentKey,
- populateFieldWithCorrespondingType(currentValue, currentSchema.getType()));
+ message.put(currentKey, map);
}
+ default -> message.put(currentKey,
+ populateFieldWithCorrespondingType(currentValue,
+ currentSchema.getType()));
}
-
+ } else if (currentValue instanceof JsonArray jsonArray) {
// If this is an Array, call method for each one of them
- else if (currentValue instanceof JsonArray jsonArray) {
- var arraySchema = record.getSchema().getField(currentKey).schema();
- Schema arrayType = arraySchema.getType() != Schema.Type.UNION ?
- arraySchema :
- arraySchema.getTypes().stream()
- .filter(s -> s.getType() != Schema.Type.NULL)
- .findFirst().get();
- Schema elementType = arrayType.getElementType();
+ var arraySchema = message.getSchema().getField(currentKey).schema();
+ Schema arrayType = arraySchema.getType() != Schema.Type.UNION
+ ? arraySchema :
+ arraySchema.getTypes().stream()
+ .filter(s -> s.getType() != Schema.Type.NULL)
+ .findFirst().get();
+ Schema elementType = arrayType.getElementType();
- if (elementType != null && Schema.Type.RECORD.equals(elementType.getType())) {
- ArrayList recordArray = new ArrayList<>();
- for (int i = 0; i < jsonArray.size(); i++) {
- SpecificRecordBase currentRecord = baseClass(record.getSchema().getNamespace(), elementType.getName()).getDeclaredConstructor().newInstance();
- populateGenericRecordFromJson((JsonObject) jsonArray.get(i), currentRecord);
- recordArray.add(currentRecord);
- }
- record.put(currentKey, recordArray);
- } else {
- ArrayList