diff --git a/.github/workflows/on_pull_request.yml b/.github/workflows/on_pull_request.yml index bce9442f..9cd5c772 100644 --- a/.github/workflows/on_pull_request.yml +++ b/.github/workflows/on_pull_request.yml @@ -28,7 +28,7 @@ jobs: - name: Publish Test Report if: always() - uses: mikepenz/action-junit-report@v3 + uses: mikepenz/action-junit-report@v4 with: report_paths: '**/target/surefire-reports/TEST-*.xml' diff --git a/.github/workflows/on_push_main.yml b/.github/workflows/on_push_main.yml index b6cf53ee..accf452e 100644 --- a/.github/workflows/on_push_main.yml +++ b/.github/workflows/on_push_main.yml @@ -36,7 +36,7 @@ jobs: - name: Publish Test Report if: always() - uses: mikepenz/action-junit-report@v3 + uses: mikepenz/action-junit-report@v4 with: report_paths: '**/target/surefire-reports/TEST-*.xml' diff --git a/.github/workflows/tag.yml b/.github/workflows/tag.yml index ad138a9e..7786aa55 100644 --- a/.github/workflows/tag.yml +++ b/.github/workflows/tag.yml @@ -25,7 +25,7 @@ jobs: cache: maven - name: Import GPG key - uses: crazy-max/ghaction-import-gpg@v5 + uses: crazy-max/ghaction-import-gpg@v6 with: gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }} git_user_signingkey: true diff --git a/README.md b/README.md index eb5a2b20..a1976907 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,8 @@ Kstreamplify is a Java library that brings new features on top of Kafka Streams. * [Production and Deserialization](#production-and-deserialization) * [Avro Schema](#avro-schema) * [REST Endpoints](#rest-endpoints) + * [Hooks](#hooks) + * [On Start](#on-start) * [Testing](#testing) * [Motivation](#motivation) * [Contribution](#contribution) @@ -81,7 +83,7 @@ For instance, you can start by creating a class annotated with `@Component`: ```java @Component -public class MyKafkaStreams implements KafkaStreamsStarter { +public class MyKafkaStreams extends KafkaStreamsStarter { @Override public void topology(StreamsBuilder streamsBuilder) { // Your topology here @@ -151,7 +153,7 @@ To do this, the first step is to override the `dlqTopic` method and return the n ```java @Component -public class MyKafkaStreams implements KafkaStreamsStarter { +public class MyKafkaStreams extends KafkaStreamsStarter { @Override public void topology(StreamsBuilder streamsBuilder) { //... } @@ -172,7 +174,7 @@ Here is a complete example of how to do this: ```java @Component -public class MyKafkaStreams implements KafkaStreamsStarter { +public class MyKafkaStreams extends KafkaStreamsStarter { @Override public void topology(StreamsBuilder streamsBuilder) { KStream myStream = streamsBuilder @@ -252,6 +254,26 @@ The Kstreamplify library provides several REST endpoints, which are listed below - `GET /liveness`: This endpoint is used as a liveness probe for Kubernetes deployment. - `GET /topology`: This endpoint returns the Kafka Streams topology as JSON. +### Hooks + +Kstreamplify offers the flexibility to execute custom code through hooks. These hooks can be defined by overriding specific methods. + +#### On Start + +The `On Start` hook allows you to execute code right after the Kafka Streams instantiation. It provides the Kafka Streams instance as a parameter. + +```java +@Component +public class MyKafkaStreams extends KafkaStreamsStarter { + @Override + public void onStart(KafkaStreams kafkaStreams) { + // Your code here + } +} +``` + +You can use this hook to perform any custom initialization or setup tasks for your Kafka Streams application. + ### Testing For testing, you can create a test class that implements `KafkaStreamsStarterTest` and override the `topology` method. Then, apply the topology of your Kafka Streams on the given `streamsBuilders`. diff --git a/kstreamplify-core-test/pom.xml b/kstreamplify-core-test/pom.xml index d34ddb26..5810895b 100644 --- a/kstreamplify-core-test/pom.xml +++ b/kstreamplify-core-test/pom.xml @@ -5,7 +5,7 @@ kstreamplify com.michelin - 0.1.0-SNAPSHOT + 0.1.1-SNAPSHOT 4.0.0 @@ -15,7 +15,7 @@ com.michelin kstreamplify-core - 0.1.0-SNAPSHOT + 0.1.1-SNAPSHOT diff --git a/kstreamplify-core/pom.xml b/kstreamplify-core/pom.xml index 7b9e44c1..da09e5ea 100644 --- a/kstreamplify-core/pom.xml +++ b/kstreamplify-core/pom.xml @@ -5,7 +5,7 @@ kstreamplify com.michelin - 0.1.0-SNAPSHOT + 0.1.1-SNAPSHOT 4.0.0 diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java index 7c23086a..4206fe40 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java @@ -8,6 +8,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.common.KafkaException; import java.util.Map; @@ -37,10 +38,17 @@ public DeserializationHandlerResponse handle(ProcessorContext processorContext, var builder = KafkaError.newBuilder(); enrichWithException(builder, consumptionException, consumerRecord.key(), consumerRecord.value()) .setContextMessage("An exception occurred during the stream internal deserialization") - .setOffset(processorContext.offset()) - .setPartition(processorContext.partition()) - .setTopic(processorContext.topic()); - getProducer().send(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), consumerRecord.key(), builder.build())).get(); + .setOffset(consumerRecord.offset()) + .setPartition(consumerRecord.partition()) + .setTopic(consumerRecord.topic()); + + boolean isCausedByKafka = consumptionException.getCause() instanceof KafkaException; + //If the cause of this exception is a KafkaException and if getCause == sourceException (see Throwable.getCause - including SerializationException) + //use to handle poison pill => sent message into dlq and continue our life. + if(isCausedByKafka || consumptionException.getCause() == null) { + getProducer().send(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), consumerRecord.key(), builder.build())).get(); + return DeserializationHandlerResponse.CONTINUE; + } } catch (InterruptedException ie) { log.error("Interruption while sending the deserialization exception {} for key {}, value {} and topic {} to DLQ topic {}", consumptionException, consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), KafkaStreamsExecutionContext.getDlqTopicName(), ie); @@ -48,10 +56,11 @@ public DeserializationHandlerResponse handle(ProcessorContext processorContext, } catch (Exception e) { log.error("Cannot send the deserialization exception {} for key {}, value {} and topic {} to DLQ topic {}", consumptionException, consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), KafkaStreamsExecutionContext.getDlqTopicName(), e); - return DeserializationHandlerResponse.FAIL; } - return DeserializationHandlerResponse.CONTINUE; + // here we only have exception like UnknownHostException for example or TimeoutException ... + // situation example: we cannot ask schema registry because the url is unavailable + return DeserializationHandlerResponse.FAIL; } /** diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java index 964575a2..c16bbcd6 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java @@ -3,17 +3,15 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; -import static org.apache.commons.lang3.StringUtils.EMPTY; - /** * The Kafka Streams starter interface */ -public interface KafkaStreamsStarter { +public abstract class KafkaStreamsStarter { /** * Define the topology of the Kafka Streams * @param streamsBuilder The streams builder */ - void topology(StreamsBuilder streamsBuilder); + public abstract void topology(StreamsBuilder streamsBuilder); /** *

Define the dead letter queue (DLQ) topic

@@ -21,11 +19,11 @@ public interface KafkaStreamsStarter { * * @return The dead letter queue (DLQ) topic */ - String dlqTopic(); + public abstract String dlqTopic(); /** * Define runnable code after the Kafka Streams startup * @param kafkaStreams The Kafka Streams instance */ - default void onStart(KafkaStreams kafkaStreams) { } + public void onStart(KafkaStreams kafkaStreams) { } } diff --git a/kstreamplify-spring-boot/pom.xml b/kstreamplify-spring-boot/pom.xml index 0dff5c01..1d3b1542 100644 --- a/kstreamplify-spring-boot/pom.xml +++ b/kstreamplify-spring-boot/pom.xml @@ -5,7 +5,7 @@ kstreamplify com.michelin - 0.1.0-SNAPSHOT + 0.1.1-SNAPSHOT 4.0.0 @@ -34,7 +34,7 @@ com.michelin kstreamplify-core - 0.1.0-SNAPSHOT + 0.1.1-SNAPSHOT diff --git a/pom.xml b/pom.xml index c5858832..781e87c5 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.michelin kstreamplify - 0.1.0-SNAPSHOT + 0.1.1-SNAPSHOT kstreamplify Kstreamplify is a Java library that brings new features on top of Kafka Streams. https://github.com/michelin/kstreamplify