Skip to content

Commit

Permalink
Add Check Style (#92)
Browse files Browse the repository at this point in the history
* Add Check Style

* Fix code smells

* Switch check style and build steps

* Fix lint
  • Loading branch information
Loïc GREFFIER committed Sep 26, 2023
1 parent d3098d5 commit cf20a0a
Show file tree
Hide file tree
Showing 49 changed files with 1,811 additions and 1,277 deletions.
382 changes: 382 additions & 0 deletions .checkstyle/checkstyle.xml

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions .github/workflows/on_pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ jobs:
distribution: 'temurin'
cache: maven

- name: Check Style
run: mvn checkstyle:check

- name: Build
run: mvn clean compile

Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/on_push_main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ jobs:
gpg-private-key: ${{ secrets.GPG_PRIVATE_KEY }}
gpg-passphrase: MAVEN_GPG_PASSPHRASE

- name: Check Style
run: mvn checkstyle:check

- name: Build
id: build
run: |
Expand Down
Binary file added .readme/contributing/check_style.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added .readme/contributing/reformat_code.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added .readme/contributing/save_actions.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added .readme/contributing/scan.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
27 changes: 26 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,32 @@ In general, we follow the ["fork-and-pull" Git workflow](https://github.com/susa
- Push changes to your fork
- Open a PR in our repository targeting master and follow the PR template so that we can efficiently review the changes.

## Styleguides
## Style Guide

### Code Style

We maintain a consistent code style using Checkstyle.

#### IntelliJ

To ensure code style consistency in IntelliJ, follow these steps:

1. Install the [CheckStyle-IDEA plugin](https://plugins.jetbrains.com/plugin/1065-checkstyle-idea).
2. Create a new CheckStyle configuration for Kstreamplify based on the code style configuration located in the `.checkstyle` folder. Configure it as follows:

![check_style.png](.readme%2Fcontributing%2Fcheck_style.png)

3. Enable the "Reformat code" and "Optimize imports" options in the save actions:

![save_actions.png](.readme%2Fcontributing%2Fsave_actions.png)

4. Reformat your code with the Checkstyle configuration:

![reformat_code.png](.readme%2Fcontributing%2Freformat_code.png)

5. Before committing your changes, ensure your contribution doesn't introduce any problems by running a scan:

![scan.png](.readme%2Fcontributing%2Fscan.png)

### Git Commit Messages

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
import com.michelin.kstreamplify.utils.TopicWithSerde;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
Expand All @@ -16,32 +22,25 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Collections;
import java.util.Properties;

/**
* <p>The main test class to extend to execute unit tests on topology</p>
* <p>The main test class to extend to execute unit tests on topology</p>.
* <p>It provides a {@link TopologyTestDriver} and a {@link TestOutputTopic} for the DLQ</p>
*/
public abstract class KafkaStreamsStarterTest {
private static final String STATE_DIR = "/tmp/kafka-streams/";

/**
* The topology test driver
* The topology test driver.
*/
protected TopologyTestDriver testDriver;

/**
* The dlq topic, initialized in {@link #generalSetUp()}
* The dlq topic, initialized in {@link #generalSetUp()}.
*/
protected TestOutputTopic<String, KafkaError> dlqTopic;

/**
* Set up topology test driver
* Set up topology test driver.
*/
@BeforeEach
void generalSetUp() {
Expand All @@ -52,7 +51,8 @@ void generalSetUp() {

KafkaStreamsExecutionContext.registerProperties(properties);
KafkaStreamsExecutionContext.setSerdesConfig(Collections
.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://" + getClass().getName()));
.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"mock://" + getClass().getName()));

var starter = getKafkaStreamsStarter();

Expand All @@ -61,20 +61,22 @@ void generalSetUp() {
StreamsBuilder streamsBuilder = new StreamsBuilder();
starter.topology(streamsBuilder);

testDriver = new TopologyTestDriver(streamsBuilder.build(), properties, getInitialWallClockTime());
testDriver =
new TopologyTestDriver(streamsBuilder.build(), properties, getInitialWallClockTime());

dlqTopic = testDriver.createOutputTopic(KafkaStreamsExecutionContext.getDlqTopicName(), new StringDeserializer(), SerdesUtils.<KafkaError>getSerdesForValue().deserializer());
dlqTopic = testDriver.createOutputTopic(KafkaStreamsExecutionContext.getDlqTopicName(),
new StringDeserializer(), SerdesUtils.<KafkaError>getSerdesForValue().deserializer());
}

/**
* Method to override to provide the KafkaStreamsStarter to test
* Method to override to provide the KafkaStreamsStarter to test.
*
* @return The KafkaStreamsStarter to test
*/
protected abstract KafkaStreamsStarter getKafkaStreamsStarter();

/**
* Default base wall clock time for topology test driver
* Default base wall clock time for topology test driver.
*
* @return The default wall clock time as instant
*/
Expand All @@ -83,7 +85,7 @@ protected Instant getInitialWallClockTime() {
}

/**
* Method to close everything properly at the end of the test
* Method to close everything properly at the end of the test.
*/
@AfterEach
void generalTearDown() throws IOException {
Expand All @@ -93,26 +95,31 @@ void generalTearDown() throws IOException {
}

/**
* Creates an input test topic on the testDriver using the provided topicWithSerde
* Creates an input test topic on the testDriver using the provided topicWithSerde.
*
* @param topicWithSerde The topic with serde used to crete the test topic
* @param <K> The serializable type of the key
* @param <V> The serializable type of the value
* @return The corresponding TestInputTopic
*/
protected <K, V> TestInputTopic<K, V> createInputTestTopic(TopicWithSerde<K, V> topicWithSerde) {
return this.testDriver.createInputTopic(topicWithSerde.getUnPrefixedName(), topicWithSerde.getKeySerde().serializer(), topicWithSerde.getValueSerde().serializer());
protected <K, V> TestInputTopic<K, V> createInputTestTopic(
TopicWithSerde<K, V> topicWithSerde) {
return this.testDriver.createInputTopic(topicWithSerde.getUnPrefixedName(),
topicWithSerde.getKeySerde().serializer(), topicWithSerde.getValueSerde().serializer());
}

/**
* Creates an output test topic on the testDriver using the provided topicWithSerde
* Creates an output test topic on the testDriver using the provided topicWithSerde.
*
* @param topicWithSerde The topic with serde used to crete the test topic
* @param <K> The serializable type of the key
* @param <V> The serializable type of the value
* @return The corresponding TestOutputTopic
*/
protected <K, V> TestOutputTopic<K, V> createOutputTestTopic(TopicWithSerde<K, V> topicWithSerde) {
return this.testDriver.createOutputTopic(topicWithSerde.getUnPrefixedName(), topicWithSerde.getKeySerde().deserializer(), topicWithSerde.getValueSerde().deserializer());
protected <K, V> TestOutputTopic<K, V> createOutputTestTopic(
TopicWithSerde<K, V> topicWithSerde) {
return this.testDriver.createOutputTopic(topicWithSerde.getUnPrefixedName(),
topicWithSerde.getKeySerde().deserializer(),
topicWithSerde.getValueSerde().deserializer());
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.michelin.kstreamplify;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.error.ProcessingResult;
import com.michelin.kstreamplify.error.TopologyErrorHandler;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import com.michelin.kstreamplify.utils.SerdesUtils;
import java.util.List;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
Expand All @@ -17,20 +20,17 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;

class TopologyErrorHandlerTest extends KafkaStreamsStarterTest {
private final String AVRO_TOPIC = "avroTopic";
private static final String AVRO_TOPIC = "avroTopic";
private static final String STRING_TOPIC = "stringTopic";
private static final String OUTPUT_AVRO_TOPIC = "outputAvroTopic";
private static final String OUTPUT_STRING_TOPIC = "outputStringTopic";
private static final String DLQ_TOPIC = "dlqTopic";

private TestInputTopic<String, KafkaError> avroInputTopic;
private final String STRING_TOPIC = "stringTopic";
private TestInputTopic<String, String> stringInputTopic;
private final String OUTPUT_AVRO_TOPIC = "outputAvroTopic";
private TestOutputTopic<String, KafkaError> avroOutputTopic;
private final String OUTPUT_STRING_TOPIC = "outputStringTopic";
private TestOutputTopic<String, String> stringOutputTopic;
private final String DLQ_TOPIC = "dlqTopic";
private TestOutputTopic<String, KafkaError> dlqTopic;

@Override
Expand All @@ -44,33 +44,44 @@ public String dlqTopic() {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, ProcessingResult<String, String>> stringStream = streamsBuilder
.stream(STRING_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(value -> "error".equals(value) ?
ProcessingResult.fail(new NullPointerException(), value) : ProcessingResult.success(value));
.stream(STRING_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(value -> "error".equals(value)
? ProcessingResult.fail(new NullPointerException(), value) :
ProcessingResult.success(value));

TopologyErrorHandler.catchErrors(stringStream)
.to(OUTPUT_STRING_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
.to(OUTPUT_STRING_TOPIC, Produced.with(Serdes.String(), Serdes.String()));

KStream<String, ProcessingResult<KafkaError, KafkaError>> avroStream = streamsBuilder
.stream(AVRO_TOPIC, Consumed.with(Serdes.String(), SerdesUtils.<KafkaError>getSerdesForValue()))
.mapValues(value -> value == null ?
ProcessingResult.fail(new NullPointerException(), null) : ProcessingResult.success(value));
KStream<String, ProcessingResult<KafkaError, KafkaError>> avroStream =
streamsBuilder
.stream(AVRO_TOPIC, Consumed.with(Serdes.String(),
SerdesUtils.<KafkaError>getSerdesForValue()))
.mapValues(value -> value == null
? ProcessingResult.fail(new NullPointerException(), null) :
ProcessingResult.success(value));

TopologyErrorHandler.catchErrors(avroStream)
.to(OUTPUT_AVRO_TOPIC, Produced.with(Serdes.String(), SerdesUtils.getSerdesForValue()));
.to(OUTPUT_AVRO_TOPIC,
Produced.with(Serdes.String(), SerdesUtils.getSerdesForValue()));
}
};
}

@BeforeEach
void setUp() {
stringInputTopic = testDriver.createInputTopic(STRING_TOPIC, new StringSerializer(), new StringSerializer());
avroInputTopic = testDriver.createInputTopic(AVRO_TOPIC, new StringSerializer(), SerdesUtils.<KafkaError>getSerdesForValue().serializer());

stringOutputTopic = testDriver.createOutputTopic(OUTPUT_STRING_TOPIC, new StringDeserializer(), new StringDeserializer());
avroOutputTopic = testDriver.createOutputTopic(OUTPUT_AVRO_TOPIC, new StringDeserializer(), SerdesUtils.<KafkaError>getSerdesForValue().deserializer());

dlqTopic = testDriver.createOutputTopic(DLQ_TOPIC, new StringDeserializer(), SerdesUtils.<KafkaError>getSerdesForValue().deserializer());
stringInputTopic = testDriver.createInputTopic(STRING_TOPIC, new StringSerializer(),
new StringSerializer());
avroInputTopic = testDriver.createInputTopic(AVRO_TOPIC, new StringSerializer(),
SerdesUtils.<KafkaError>getSerdesForValue().serializer());

stringOutputTopic =
testDriver.createOutputTopic(OUTPUT_STRING_TOPIC, new StringDeserializer(),
new StringDeserializer());
avroOutputTopic = testDriver.createOutputTopic(OUTPUT_AVRO_TOPIC, new StringDeserializer(),
SerdesUtils.<KafkaError>getSerdesForValue().deserializer());

dlqTopic = testDriver.createOutputTopic(DLQ_TOPIC, new StringDeserializer(),
SerdesUtils.<KafkaError>getSerdesForValue().deserializer());
}

@Test
Expand All @@ -86,7 +97,7 @@ void shouldContinueWhenProcessingValueIsValid() {
}

@Test
void shouldSendExceptionToDLQWhenProcessingValueIsInvalid() {
void shouldSendExceptionToDlqWhenProcessingValueIsInvalid() {
stringInputTopic.pipeInput("key", "error");

var resultDlq = dlqTopic.readValuesToList();
Expand All @@ -100,13 +111,13 @@ void shouldSendExceptionToDLQWhenProcessingValueIsInvalid() {
void shouldContinueWhenProcessingValueIsValidAvro() {

KafkaError avroModel = KafkaError.newBuilder()
.setTopic("topic")
.setStack("stack")
.setPartition(0)
.setOffset(0)
.setCause("cause")
.setValue("value")
.build();
.setTopic("topic")
.setStack("stack")
.setPartition(0)
.setOffset(0)
.setCause("cause")
.setValue("value")
.build();

avroInputTopic.pipeInput("key", avroModel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,37 @@
import lombok.NoArgsConstructor;

/**
* HTTP server constants
* HTTP server constants.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class HttpServerConstants {
/**
* Readiness probe path property name
* Readiness probe path property name.
*/
public static final String READINESS_PROPERTY = "readiness_path";

/**
* Liveness probe path property name
* Liveness probe path property name.
*/
public static final String LIVENESS_PROPERTY = "liveness_path";

/**
* Topology property name
* Topology property name.
*/
public static final String TOPOLOGY_PROPERTY = "expose_topology_path";

/**
* Readiness default path
* Readiness default path.
*/
public static final String READINESS_DEFAULT_PATH = "ready";

/**
* Liveness default path
* Liveness default path.
*/
public static final String LIVENESS_DEFAULT_PATH = "liveness";

/**
* Topology default path
* Topology default path.
*/
public static final String TOPOLOGY_DEFAULT_PATH = "topology";
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,27 @@
import lombok.NoArgsConstructor;

/**
* Kafka Streams initialization constants
* Kafka Streams initialization constants.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class InitializerConstants {
/**
* Server port property name
* Server port property name.
*/
public static final String SERVER_PORT_PROPERTY = "server.port";

/**
* Default host
* Default host.
*/
public static final String LOCALHOST = "localhost";

/**
* Name of the property containing of the name of the var env containing the IP
* Name of the property containing of the name of the var env containing the IP.
*/
public static final String IP_SYSTEM_VARIABLE_PROPERTY = "ip.env.var.name";

/**
* Default var env name containing the IP
* Default var env name containing the IP.
*/
public static final String IP_SYSTEM_VARIABLE_DEFAULT = "MY_POD_IP";
}
Loading

0 comments on commit cf20a0a

Please sign in to comment.