From 9daa18a5bd6b9e566bff8c1e04edca41d06618fc Mon Sep 17 00:00:00 2001 From: clvacher Date: Wed, 6 Sep 2023 11:31:42 +0200 Subject: [PATCH 1/6] Switch to abstract implementation of starter (#86) Co-authored-by: FP17639 --- README.md | 6 +++--- kstreamplify-core-test/pom.xml | 4 ++-- kstreamplify-core/pom.xml | 2 +- .../kstreamplify/initializer/KafkaStreamsStarter.java | 8 ++++---- kstreamplify-spring-boot/pom.xml | 4 ++-- pom.xml | 2 +- 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index eb5a2b20..9dd7a980 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,7 @@ For instance, you can start by creating a class annotated with `@Component`: ```java @Component -public class MyKafkaStreams implements KafkaStreamsStarter { +public class MyKafkaStreams extends KafkaStreamsStarter { @Override public void topology(StreamsBuilder streamsBuilder) { // Your topology here @@ -151,7 +151,7 @@ To do this, the first step is to override the `dlqTopic` method and return the n ```java @Component -public class MyKafkaStreams implements KafkaStreamsStarter { +public class MyKafkaStreams extends KafkaStreamsStarter { @Override public void topology(StreamsBuilder streamsBuilder) { //... } @@ -172,7 +172,7 @@ Here is a complete example of how to do this: ```java @Component -public class MyKafkaStreams implements KafkaStreamsStarter { +public class MyKafkaStreams extends KafkaStreamsStarter { @Override public void topology(StreamsBuilder streamsBuilder) { KStream myStream = streamsBuilder diff --git a/kstreamplify-core-test/pom.xml b/kstreamplify-core-test/pom.xml index d34ddb26..5810895b 100644 --- a/kstreamplify-core-test/pom.xml +++ b/kstreamplify-core-test/pom.xml @@ -5,7 +5,7 @@ kstreamplify com.michelin - 0.1.0-SNAPSHOT + 0.1.1-SNAPSHOT 4.0.0 @@ -15,7 +15,7 @@ com.michelin kstreamplify-core - 0.1.0-SNAPSHOT + 0.1.1-SNAPSHOT diff --git a/kstreamplify-core/pom.xml b/kstreamplify-core/pom.xml index 7b9e44c1..da09e5ea 100644 --- a/kstreamplify-core/pom.xml +++ b/kstreamplify-core/pom.xml @@ -5,7 +5,7 @@ kstreamplify com.michelin - 0.1.0-SNAPSHOT + 0.1.1-SNAPSHOT 4.0.0 diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java index 964575a2..c70a35fb 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java @@ -8,12 +8,12 @@ /** * The Kafka Streams starter interface */ -public interface KafkaStreamsStarter { +public abstract class KafkaStreamsStarter { /** * Define the topology of the Kafka Streams * @param streamsBuilder The streams builder */ - void topology(StreamsBuilder streamsBuilder); + public abstract void topology(StreamsBuilder streamsBuilder); /** *

Define the dead letter queue (DLQ) topic

@@ -21,11 +21,11 @@ public interface KafkaStreamsStarter { * * @return The dead letter queue (DLQ) topic */ - String dlqTopic(); + public abstract String dlqTopic(); /** * Define runnable code after the Kafka Streams startup * @param kafkaStreams The Kafka Streams instance */ - default void onStart(KafkaStreams kafkaStreams) { } + public void onStart(KafkaStreams kafkaStreams) { } } diff --git a/kstreamplify-spring-boot/pom.xml b/kstreamplify-spring-boot/pom.xml index 0dff5c01..1d3b1542 100644 --- a/kstreamplify-spring-boot/pom.xml +++ b/kstreamplify-spring-boot/pom.xml @@ -5,7 +5,7 @@ kstreamplify com.michelin - 0.1.0-SNAPSHOT + 0.1.1-SNAPSHOT 4.0.0 @@ -34,7 +34,7 @@ com.michelin kstreamplify-core - 0.1.0-SNAPSHOT + 0.1.1-SNAPSHOT diff --git a/pom.xml b/pom.xml index c8fcba5c..41fe3e30 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.michelin kstreamplify - 0.1.0-SNAPSHOT + 0.1.1-SNAPSHOT kstreamplify Kstreamplify is a Java library that brings new features on top of Kafka Streams. https://github.com/michelin/kstreamplify From 75680fae88a78831d6889b1f6f87501973503943 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20GREFFIER?= Date: Wed, 6 Sep 2023 11:36:33 +0200 Subject: [PATCH 2/6] Add on start documentation (#85) * Add on start documentation * Add on start entry * Replace implements by extends --- README.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/README.md b/README.md index 9dd7a980..a1976907 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,8 @@ Kstreamplify is a Java library that brings new features on top of Kafka Streams. * [Production and Deserialization](#production-and-deserialization) * [Avro Schema](#avro-schema) * [REST Endpoints](#rest-endpoints) + * [Hooks](#hooks) + * [On Start](#on-start) * [Testing](#testing) * [Motivation](#motivation) * [Contribution](#contribution) @@ -252,6 +254,26 @@ The Kstreamplify library provides several REST endpoints, which are listed below - `GET /liveness`: This endpoint is used as a liveness probe for Kubernetes deployment. - `GET /topology`: This endpoint returns the Kafka Streams topology as JSON. +### Hooks + +Kstreamplify offers the flexibility to execute custom code through hooks. These hooks can be defined by overriding specific methods. + +#### On Start + +The `On Start` hook allows you to execute code right after the Kafka Streams instantiation. It provides the Kafka Streams instance as a parameter. + +```java +@Component +public class MyKafkaStreams extends KafkaStreamsStarter { + @Override + public void onStart(KafkaStreams kafkaStreams) { + // Your code here + } +} +``` + +You can use this hook to perform any custom initialization or setup tasks for your Kafka Streams application. + ### Testing For testing, you can create a test class that implements `KafkaStreamsStarterTest` and override the `topology` method. Then, apply the topology of your Kafka Streams on the given `streamsBuilders`. From 55d546b6fe7119e815865fcaea515eb912a628f9 Mon Sep 17 00:00:00 2001 From: clvacher Date: Wed, 6 Sep 2023 15:48:50 +0200 Subject: [PATCH 3/6] fix by Marie-Laure Momplot, record contains info and not the context (#89) Co-authored-by: FP17639 --- .../error/DlqDeserializationExceptionHandler.java | 6 +++--- .../kstreamplify/initializer/KafkaStreamsStarter.java | 2 -- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java index faeb7023..89cf7aa0 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java @@ -37,9 +37,9 @@ public DeserializationHandlerResponse handle(ProcessorContext processorContext, var builder = KafkaError.newBuilder(); enrichWithException(builder, consumptionException, consumerRecord.key(), consumerRecord.value()) .setContextMessage("An exception occurred during the stream internal deserialization") - .setOffset(processorContext.offset()) - .setPartition(processorContext.partition()) - .setTopic(processorContext.topic()); + .setOffset(consumerRecord.offset()) + .setPartition(consumerRecord.partition()) + .setTopic(consumerRecord.topic()); producer.send(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), consumerRecord.key(), builder.build())).get(); } catch (InterruptedException ie) { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java index c70a35fb..c16bbcd6 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java @@ -3,8 +3,6 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; -import static org.apache.commons.lang3.StringUtils.EMPTY; - /** * The Kafka Streams starter interface */ From f8cd553bfb1857af7e8fc808610cf213e0593276 Mon Sep 17 00:00:00 2001 From: Marie-Laure Momplot Date: Wed, 6 Sep 2023 18:04:00 +0200 Subject: [PATCH 4/6] Change way of working on DlqDeserializationExceptionHandler. (#75) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * change way of working on deserializationExceptionHandler : catch poison pill ☠ and CONTINUE and FAIL only if we had network issue (schema registry timeout for example) 💻 * fix compile errors 🙄 * change comment * remove useless early return --------- Co-authored-by: Marie-Laure Momplot Co-authored-by: FP17639 --- .../error/DlqDeserializationExceptionHandler.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java index 89cf7aa0..3c1cc745 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java @@ -8,6 +8,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.common.KafkaException; import java.util.Map; @@ -41,7 +42,13 @@ public DeserializationHandlerResponse handle(ProcessorContext processorContext, .setPartition(consumerRecord.partition()) .setTopic(consumerRecord.topic()); - producer.send(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), consumerRecord.key(), builder.build())).get(); + boolean isCausedByKafka = consumptionException.getCause() instanceof KafkaException; + //If the cause of this exception is a KafkaException and if getCause == sourceException (see Throwable.getCause - including SerializationException) + //use to handle poison pill => sent message into dlq and continue our life. + if(isCausedByKafka || consumptionException.getCause() == null) { + producer.send(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), consumerRecord.key(), builder.build())).get(); + return DeserializationHandlerResponse.CONTINUE; + } } catch (InterruptedException ie) { log.error("Interruption while sending the deserialization exception {} for key {}, value {} and topic {} to DLQ topic {}", consumptionException, consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), KafkaStreamsExecutionContext.getDlqTopicName(), ie); @@ -49,10 +56,11 @@ public DeserializationHandlerResponse handle(ProcessorContext processorContext, } catch (Exception e) { log.error("Cannot send the deserialization exception {} for key {}, value {} and topic {} to DLQ topic {}", consumptionException, consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), KafkaStreamsExecutionContext.getDlqTopicName(), e); - return DeserializationHandlerResponse.FAIL; } - return DeserializationHandlerResponse.CONTINUE; + // here we only have exception like UnknownHostException for example or TimeoutException ... + // situation example: we cannot ask schema registry because the url is unavailable + return DeserializationHandlerResponse.FAIL; } /** From f636850fdfaf6a7918f8c30926a8b3f4c1c5f894 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 12 Sep 2023 09:08:29 +0200 Subject: [PATCH 5/6] Bump crazy-max/ghaction-import-gpg from 5 to 6 (#95) Bumps [crazy-max/ghaction-import-gpg](https://github.com/crazy-max/ghaction-import-gpg) from 5 to 6. - [Release notes](https://github.com/crazy-max/ghaction-import-gpg/releases) - [Commits](https://github.com/crazy-max/ghaction-import-gpg/compare/v5...v6) --- updated-dependencies: - dependency-name: crazy-max/ghaction-import-gpg dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/tag.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tag.yml b/.github/workflows/tag.yml index ad138a9e..7786aa55 100644 --- a/.github/workflows/tag.yml +++ b/.github/workflows/tag.yml @@ -25,7 +25,7 @@ jobs: cache: maven - name: Import GPG key - uses: crazy-max/ghaction-import-gpg@v5 + uses: crazy-max/ghaction-import-gpg@v6 with: gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }} git_user_signingkey: true From 396dce4a1bd5053a8adf99c771568efa04e119cc Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 12 Sep 2023 09:08:36 +0200 Subject: [PATCH 6/6] Bump mikepenz/action-junit-report from 3 to 4 (#94) Bumps [mikepenz/action-junit-report](https://github.com/mikepenz/action-junit-report) from 3 to 4. - [Release notes](https://github.com/mikepenz/action-junit-report/releases) - [Commits](https://github.com/mikepenz/action-junit-report/compare/v3...v4) --- updated-dependencies: - dependency-name: mikepenz/action-junit-report dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/on_pull_request.yml | 2 +- .github/workflows/on_push_main.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/on_pull_request.yml b/.github/workflows/on_pull_request.yml index bce9442f..9cd5c772 100644 --- a/.github/workflows/on_pull_request.yml +++ b/.github/workflows/on_pull_request.yml @@ -28,7 +28,7 @@ jobs: - name: Publish Test Report if: always() - uses: mikepenz/action-junit-report@v3 + uses: mikepenz/action-junit-report@v4 with: report_paths: '**/target/surefire-reports/TEST-*.xml' diff --git a/.github/workflows/on_push_main.yml b/.github/workflows/on_push_main.yml index b6cf53ee..accf452e 100644 --- a/.github/workflows/on_push_main.yml +++ b/.github/workflows/on_push_main.yml @@ -36,7 +36,7 @@ jobs: - name: Publish Test Report if: always() - uses: mikepenz/action-junit-report@v3 + uses: mikepenz/action-junit-report@v4 with: report_paths: '**/target/surefire-reports/TEST-*.xml'