Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Check Style #92

Merged
merged 5 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
382 changes: 382 additions & 0 deletions .checkstyle/checkstyle.xml

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions .github/workflows/on_pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ jobs:
distribution: 'temurin'
cache: maven

- name: Check Style
run: mvn checkstyle:check

- name: Build
run: mvn clean compile

Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/on_push_main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ jobs:
gpg-private-key: ${{ secrets.GPG_PRIVATE_KEY }}
gpg-passphrase: MAVEN_GPG_PASSPHRASE

- name: Check Style
run: mvn checkstyle:check

- name: Build
id: build
run: |
Expand Down
Binary file added .readme/contributing/check_style.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added .readme/contributing/reformat_code.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added .readme/contributing/save_actions.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added .readme/contributing/scan.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
27 changes: 26 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,32 @@ In general, we follow the ["fork-and-pull" Git workflow](https://github.com/susa
- Push changes to your fork
- Open a PR in our repository targeting master and follow the PR template so that we can efficiently review the changes.

## Styleguides
## Style Guide

### Code Style

We maintain a consistent code style using Checkstyle.

#### IntelliJ

To ensure code style consistency in IntelliJ, follow these steps:

1. Install the [CheckStyle-IDEA plugin](https://plugins.jetbrains.com/plugin/1065-checkstyle-idea).
2. Create a new CheckStyle configuration for Kstreamplify based on the code style configuration located in the `.checkstyle` folder. Configure it as follows:

![check_style.png](.readme%2Fcontributing%2Fcheck_style.png)

3. Enable the "Reformat code" and "Optimize imports" options in the save actions:

![save_actions.png](.readme%2Fcontributing%2Fsave_actions.png)

4. Reformat your code with the Checkstyle configuration:

![reformat_code.png](.readme%2Fcontributing%2Freformat_code.png)

5. Before committing your changes, ensure your contribution doesn't introduce any problems by running a scan:

![scan.png](.readme%2Fcontributing%2Fscan.png)

### Git Commit Messages

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
import com.michelin.kstreamplify.utils.TopicWithSerde;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
Expand All @@ -16,32 +22,25 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Collections;
import java.util.Properties;

/**
* <p>The main test class to extend to execute unit tests on topology</p>
* <p>The main test class to extend to execute unit tests on topology</p>.
* <p>It provides a {@link TopologyTestDriver} and a {@link TestOutputTopic} for the DLQ</p>
*/
public abstract class KafkaStreamsStarterTest {
private static final String STATE_DIR = "/tmp/kafka-streams/";

/**
* The topology test driver
* The topology test driver.
*/
protected TopologyTestDriver testDriver;

/**
* The dlq topic, initialized in {@link #generalSetUp()}
* The dlq topic, initialized in {@link #generalSetUp()}.
*/
protected TestOutputTopic<String, KafkaError> dlqTopic;

/**
* Set up topology test driver
* Set up topology test driver.
*/
@BeforeEach
void generalSetUp() {
Expand All @@ -52,7 +51,8 @@ void generalSetUp() {

KafkaStreamsExecutionContext.registerProperties(properties);
KafkaStreamsExecutionContext.setSerdesConfig(Collections
.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://" + getClass().getName()));
.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"mock://" + getClass().getName()));

var starter = getKafkaStreamsStarter();

Expand All @@ -61,20 +61,22 @@ void generalSetUp() {
StreamsBuilder streamsBuilder = new StreamsBuilder();
starter.topology(streamsBuilder);

testDriver = new TopologyTestDriver(streamsBuilder.build(), properties, getInitialWallClockTime());
testDriver =
new TopologyTestDriver(streamsBuilder.build(), properties, getInitialWallClockTime());

dlqTopic = testDriver.createOutputTopic(KafkaStreamsExecutionContext.getDlqTopicName(), new StringDeserializer(), SerdesUtils.<KafkaError>getSerdesForValue().deserializer());
dlqTopic = testDriver.createOutputTopic(KafkaStreamsExecutionContext.getDlqTopicName(),
new StringDeserializer(), SerdesUtils.<KafkaError>getSerdesForValue().deserializer());
}

/**
* Method to override to provide the KafkaStreamsStarter to test
* Method to override to provide the KafkaStreamsStarter to test.
*
* @return The KafkaStreamsStarter to test
*/
protected abstract KafkaStreamsStarter getKafkaStreamsStarter();

/**
* Default base wall clock time for topology test driver
* Default base wall clock time for topology test driver.
*
* @return The default wall clock time as instant
*/
Expand All @@ -83,7 +85,7 @@ protected Instant getInitialWallClockTime() {
}

/**
* Method to close everything properly at the end of the test
* Method to close everything properly at the end of the test.
*/
@AfterEach
void generalTearDown() throws IOException {
Expand All @@ -93,26 +95,31 @@ void generalTearDown() throws IOException {
}

/**
* Creates an input test topic on the testDriver using the provided topicWithSerde
* Creates an input test topic on the testDriver using the provided topicWithSerde.
*
* @param topicWithSerde The topic with serde used to crete the test topic
* @param <K> The serializable type of the key
* @param <V> The serializable type of the value
* @return The corresponding TestInputTopic
*/
protected <K, V> TestInputTopic<K, V> createInputTestTopic(TopicWithSerde<K, V> topicWithSerde) {
return this.testDriver.createInputTopic(topicWithSerde.getUnPrefixedName(), topicWithSerde.getKeySerde().serializer(), topicWithSerde.getValueSerde().serializer());
protected <K, V> TestInputTopic<K, V> createInputTestTopic(
TopicWithSerde<K, V> topicWithSerde) {
return this.testDriver.createInputTopic(topicWithSerde.getUnPrefixedName(),
topicWithSerde.getKeySerde().serializer(), topicWithSerde.getValueSerde().serializer());
}

/**
* Creates an output test topic on the testDriver using the provided topicWithSerde
* Creates an output test topic on the testDriver using the provided topicWithSerde.
*
* @param topicWithSerde The topic with serde used to crete the test topic
* @param <K> The serializable type of the key
* @param <V> The serializable type of the value
* @return The corresponding TestOutputTopic
*/
protected <K, V> TestOutputTopic<K, V> createOutputTestTopic(TopicWithSerde<K, V> topicWithSerde) {
return this.testDriver.createOutputTopic(topicWithSerde.getUnPrefixedName(), topicWithSerde.getKeySerde().deserializer(), topicWithSerde.getValueSerde().deserializer());
protected <K, V> TestOutputTopic<K, V> createOutputTestTopic(
TopicWithSerde<K, V> topicWithSerde) {
return this.testDriver.createOutputTopic(topicWithSerde.getUnPrefixedName(),
topicWithSerde.getKeySerde().deserializer(),
topicWithSerde.getValueSerde().deserializer());
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.michelin.kstreamplify;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.error.ProcessingResult;
import com.michelin.kstreamplify.error.TopologyErrorHandler;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import com.michelin.kstreamplify.utils.SerdesUtils;
import java.util.List;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
Expand All @@ -17,20 +20,17 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;

class TopologyErrorHandlerTest extends KafkaStreamsStarterTest {
private final String AVRO_TOPIC = "avroTopic";
private static final String AVRO_TOPIC = "avroTopic";
private static final String STRING_TOPIC = "stringTopic";
private static final String OUTPUT_AVRO_TOPIC = "outputAvroTopic";
private static final String OUTPUT_STRING_TOPIC = "outputStringTopic";
private static final String DLQ_TOPIC = "dlqTopic";

private TestInputTopic<String, KafkaError> avroInputTopic;
private final String STRING_TOPIC = "stringTopic";
private TestInputTopic<String, String> stringInputTopic;
private final String OUTPUT_AVRO_TOPIC = "outputAvroTopic";
private TestOutputTopic<String, KafkaError> avroOutputTopic;
private final String OUTPUT_STRING_TOPIC = "outputStringTopic";
private TestOutputTopic<String, String> stringOutputTopic;
private final String DLQ_TOPIC = "dlqTopic";
private TestOutputTopic<String, KafkaError> dlqTopic;

@Override
Expand All @@ -44,33 +44,44 @@ public String dlqTopic() {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, ProcessingResult<String, String>> stringStream = streamsBuilder
.stream(STRING_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(value -> "error".equals(value) ?
ProcessingResult.fail(new NullPointerException(), value) : ProcessingResult.success(value));
.stream(STRING_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(value -> "error".equals(value)
? ProcessingResult.fail(new NullPointerException(), value) :
ProcessingResult.success(value));

TopologyErrorHandler.catchErrors(stringStream)
.to(OUTPUT_STRING_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
.to(OUTPUT_STRING_TOPIC, Produced.with(Serdes.String(), Serdes.String()));

KStream<String, ProcessingResult<KafkaError, KafkaError>> avroStream = streamsBuilder
.stream(AVRO_TOPIC, Consumed.with(Serdes.String(), SerdesUtils.<KafkaError>getSerdesForValue()))
.mapValues(value -> value == null ?
ProcessingResult.fail(new NullPointerException(), null) : ProcessingResult.success(value));
KStream<String, ProcessingResult<KafkaError, KafkaError>> avroStream =
streamsBuilder
.stream(AVRO_TOPIC, Consumed.with(Serdes.String(),
SerdesUtils.<KafkaError>getSerdesForValue()))
.mapValues(value -> value == null
? ProcessingResult.fail(new NullPointerException(), null) :
ProcessingResult.success(value));

TopologyErrorHandler.catchErrors(avroStream)
.to(OUTPUT_AVRO_TOPIC, Produced.with(Serdes.String(), SerdesUtils.getSerdesForValue()));
.to(OUTPUT_AVRO_TOPIC,
Produced.with(Serdes.String(), SerdesUtils.getSerdesForValue()));
}
};
}

@BeforeEach
void setUp() {
stringInputTopic = testDriver.createInputTopic(STRING_TOPIC, new StringSerializer(), new StringSerializer());
avroInputTopic = testDriver.createInputTopic(AVRO_TOPIC, new StringSerializer(), SerdesUtils.<KafkaError>getSerdesForValue().serializer());

stringOutputTopic = testDriver.createOutputTopic(OUTPUT_STRING_TOPIC, new StringDeserializer(), new StringDeserializer());
avroOutputTopic = testDriver.createOutputTopic(OUTPUT_AVRO_TOPIC, new StringDeserializer(), SerdesUtils.<KafkaError>getSerdesForValue().deserializer());

dlqTopic = testDriver.createOutputTopic(DLQ_TOPIC, new StringDeserializer(), SerdesUtils.<KafkaError>getSerdesForValue().deserializer());
stringInputTopic = testDriver.createInputTopic(STRING_TOPIC, new StringSerializer(),
new StringSerializer());
avroInputTopic = testDriver.createInputTopic(AVRO_TOPIC, new StringSerializer(),
SerdesUtils.<KafkaError>getSerdesForValue().serializer());

stringOutputTopic =
testDriver.createOutputTopic(OUTPUT_STRING_TOPIC, new StringDeserializer(),
new StringDeserializer());
avroOutputTopic = testDriver.createOutputTopic(OUTPUT_AVRO_TOPIC, new StringDeserializer(),
SerdesUtils.<KafkaError>getSerdesForValue().deserializer());

dlqTopic = testDriver.createOutputTopic(DLQ_TOPIC, new StringDeserializer(),
SerdesUtils.<KafkaError>getSerdesForValue().deserializer());
}

@Test
Expand All @@ -86,7 +97,7 @@ void shouldContinueWhenProcessingValueIsValid() {
}

@Test
void shouldSendExceptionToDLQWhenProcessingValueIsInvalid() {
void shouldSendExceptionToDlqWhenProcessingValueIsInvalid() {
stringInputTopic.pipeInput("key", "error");

var resultDlq = dlqTopic.readValuesToList();
Expand All @@ -100,13 +111,13 @@ void shouldSendExceptionToDLQWhenProcessingValueIsInvalid() {
void shouldContinueWhenProcessingValueIsValidAvro() {

KafkaError avroModel = KafkaError.newBuilder()
.setTopic("topic")
.setStack("stack")
.setPartition(0)
.setOffset(0)
.setCause("cause")
.setValue("value")
.build();
.setTopic("topic")
.setStack("stack")
.setPartition(0)
.setOffset(0)
.setCause("cause")
.setValue("value")
.build();

avroInputTopic.pipeInput("key", avroModel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,37 @@
import lombok.NoArgsConstructor;

/**
* HTTP server constants
* HTTP server constants.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class HttpServerConstants {
/**
* Readiness probe path property name
* Readiness probe path property name.
*/
public static final String READINESS_PROPERTY = "readiness_path";

/**
* Liveness probe path property name
* Liveness probe path property name.
*/
public static final String LIVENESS_PROPERTY = "liveness_path";

/**
* Topology property name
* Topology property name.
*/
public static final String TOPOLOGY_PROPERTY = "expose_topology_path";

/**
* Readiness default path
* Readiness default path.
*/
public static final String READINESS_DEFAULT_PATH = "ready";

/**
* Liveness default path
* Liveness default path.
*/
public static final String LIVENESS_DEFAULT_PATH = "liveness";

/**
* Topology default path
* Topology default path.
*/
public static final String TOPOLOGY_DEFAULT_PATH = "topology";
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,27 @@
import lombok.NoArgsConstructor;

/**
* Kafka Streams initialization constants
* Kafka Streams initialization constants.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class InitializerConstants {
/**
* Server port property name
* Server port property name.
*/
public static final String SERVER_PORT_PROPERTY = "server.port";

/**
* Default host
* Default host.
*/
public static final String LOCALHOST = "localhost";

/**
* Name of the property containing of the name of the var env containing the IP
* Name of the property containing of the name of the var env containing the IP.
*/
public static final String IP_SYSTEM_VARIABLE_PROPERTY = "ip.env.var.name";

/**
* Default var env name containing the IP
* Default var env name containing the IP.
*/
public static final String IP_SYSTEM_VARIABLE_DEFAULT = "MY_POD_IP";
}
Loading