From 7d177d143e5b2bdb07e80a6143611aed06130133 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20GREFFIER?= Date: Wed, 11 Dec 2024 12:31:54 +0100 Subject: [PATCH] Add documentation about TopicWithSerde (#290) --- README.md | 177 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 126 insertions(+), 51 deletions(-) diff --git a/README.md b/README.md index 4026d1f..dbb14d8 100644 --- a/README.md +++ b/README.md @@ -38,14 +38,18 @@ need to do: * [Production and Deserialization Errors](#production-and-deserialization-errors) * [Avro Schema](#avro-schema) * [Uncaught Exception Handler](#uncaught-exception-handler) -* [Kubernetes](#kubernetes) -* [Hooks](#hooks) - * [On Start](#on-start) +* [Web Services](#web-services) + * [Topology](#topology) + * [State Stores](#state-stores) + * [Kubernetes](#kubernetes) +* [TopicWithSerde API](#topicwithserde-api) + * [Declaration](#declaration) + * [Prefix](#prefix) * [Interactive Queries](#interactive-queries) - * [Application Server Configuration](#application-server-configuration) - * [Web Services](#web-services) + * [Configuration](#configuration) * [Services](#services) -* [Topology](#topology) +* [Hooks](#hooks) + * [On Start](#on-start) * [Deduplication](#deduplication) * [By Key](#by-key) * [By Key and Value](#by-key-and-value) @@ -99,8 +103,7 @@ To include the core Kstreamplify library in your project, add the following depe [![javadoc](https://javadoc.io/badge2/com.michelin/kstreamplify-spring-boot/javadoc.svg?style=for-the-badge&)](https://javadoc.io/doc/com.michelin/kstreamplify-spring-boot) -If you are using Spring Boot, you can integrate Kstreamplify with your Spring Boot application by adding the following -dependency: +For Spring Boot applications, use the following dependency: ```xml @@ -134,8 +137,7 @@ initialization of Kafka Streams for you. ### Create your first Kstreamplify application -To create a Kstreamplify application, define a `KafkaStreamsStarter` bean within your Spring Boot context and -override the `KafkaStreamsStarter#topology()` method: +Define a `KafkaStreamsStarter` bean within your Spring Boot context and override the `KafkaStreamsStarter#topology()` method: ```java @Component @@ -169,7 +171,7 @@ kafka: avro.remove.java.properties: true ``` -Note that all the Kafka Streams properties have been moved under `kafka.properties`. +Note that all the Kafka Streams properties are prefixed with `kafka.properties`. ## Avro Serializer and Deserializer @@ -223,10 +225,7 @@ public class MyKafkaStreams extends KafkaStreamsStarter { ### Processing Errors -Kstreamplify provides utilities to handle errors that occur during the processing of records and route them to a DLQ topic. - -The processing result is encapsulated and marked as either success or failure. -Failed records will be routed to the DLQ topic, while successful records will still be up for further processing. +To handle processing errors and route them to the DLQ topic, you can use the `ProcessingResult` class. ```java @Component @@ -262,7 +261,7 @@ The map values processing returns a `ProcessingResult`, where: - The first parameter is the type of the new value after a successful transformation. - The second parameter is the type of the current value for which the transformation failed. -You can use the following to mark the result as successful: +Use the following to mark the result as successful: ```java ProcessingResult.success(value); @@ -274,13 +273,13 @@ Or the following in a catch clause to mark the result as failed: ProcessingResult.fail(e, value, "Something bad happened..."); ``` -The stream of `ProcessingResult` needs to be lightened of the failed records by sending them to the DLQ topic. -This is done by invoking the `TopologyErrorHandler#catchErrors()` method. +Invoke the `TopologyErrorHandler#catchErrors()` by passing the stream to route the failed records to the DLQ topic. A healthy stream is then returned and can be further processed. ### Production and Deserialization Errors -Kstreamplify provides production and deserialization handlers that send errors to the DLQ topic. +Production and deserialization handlers are provided to send errors to the DLQ topic. +Add the following properties to your `application.yml` file: ```yml kafka: @@ -296,9 +295,9 @@ available [here](https://github.com/michelin/kstreamplify/blob/main/kstreamplify ### Uncaught Exception Handler -Kstreamplify defines a default uncaught exception handler that catches all uncaught exceptions and shuts down the client. +The default uncaught exception handler catches all uncaught exceptions and shuts down the client. -If you want to override this behavior, you can override the `KafkaStreamsStarter#uncaughtExceptionHandler()` method and return your own +To change this behaviour, override the `KafkaStreamsStarter#uncaughtExceptionHandler()` method and return your own uncaught exception handler. ```java @@ -310,11 +309,38 @@ public StreamsUncaughtExceptionHandler uncaughtExceptionHandler() { } ``` -## Kubernetes +## Web Services + +Kstreamplify provides web services on top of your Kafka Streams application. +They are available through the [Swagger UI](#swagger). + +### Topology + +The topology endpoint return the Kafka Streams topology description. It is available at `/topology` by default. + +The path can be customized by setting the following property: + +```yml +topology: + path: custom-topology +``` + +### State Stores + +A list of endpoints to query the state stores of your Kafka Streams application is available. +It uses [interactive queries](https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html) and +handle state stores being on different Kafka Streams instances by providing an [RPC layer](https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html#adding-an-rpc-layer-to-your-application). + +Here is the list of supported state store types: +- Key-Value store +- Window store + +Only state stores with String keys are supported. -Kstreamplify provides readiness and liveness probes for Kubernetes deployment based on the Kafka Streams state. +### Kubernetes -By default, the endpoints are available at `/ready` and `/liveness`. +Readiness and liveness endpoints provide probes for Kubernetes deployment based on the Kafka Streams state. +They are available at `/ready` and `/liveness` by default. The path can be customized by setting the following properties: @@ -326,29 +352,86 @@ kubernetes: path: custom-liveness ``` -## Hooks +## TopicWithSerde API -Kstreamplify offers the flexibility to execute custom code through hooks. +Kstreamplify provides an API called `TopicWithSerde` that unifies all the consumption and production points and +deals with topics being owned by different teams across different environments. -### On Start +### Declaration -The `On Start` hook allows you to execute code before starting the Kafka Streams instance. +Declare your consumption and production points in a separate class. +It requires a topic name, a key SerDe, and a value SerDe. ```java +public static TopicWithSerde inputTopic() { + return new TopicWithSerde<>( + "INPUT_TOPIC", + Serdes.String(), + SerdesUtils.getValueSerdes() + ); +} + +public static TopicWithSerde outputTopic() { + return new TopicWithSerde<>( + "OUTPUT_TOPIC", + Serdes.String(), + SerdesUtils.getValueSerdes() + ); +} +``` + +Use it in your topology: + +```java +@Slf4j @Component public class MyKafkaStreams extends KafkaStreamsStarter { @Override - public void onStart(KafkaStreams kafkaStreams) { - // Do something before starting the Kafka Streams instance + public void topology(StreamsBuilder streamsBuilder) { + KStream stream = inputTopic().stream(streamsBuilder); + outputTopic().produce(stream); } } ``` +### Prefix + +The `TopicWithSerde` API power is to handle topics owned by different teams across different environments without altering the topology. +It uses prefixes to differentiate teams and topic ownership. + +In the `application.yml` file, declare the prefixes in a `key: value` format: + +```yml +kafka: + properties: + prefix: + self: staging.team1. + team2: staging.team2. + team3: staging.team3. +``` + +Include the prefix `TopicWithSerde` declaration: + +```java +public static TopicWithSerde inputTopic() { + return new TopicWithSerde<>( + "INPUT_TOPIC", + "team1", + Serdes.String(), + SerdesUtils.getValueSerdes() + ); +} +``` + +The topic `staging.team1.INPUT_TOPIC` will be consumed when running the application with the staging `application.yml` file. + +When not specifying a prefix, the prefix `self` is used by default. + ## Interactive Queries Kstreamplify wants to ease the use of [interactive queries](https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html) in Kafka Streams application. -### Application Server Configuration +### Configuration The "[application.server](https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html#application-server)" property value is determined from different sources by the following order of priority: @@ -363,19 +446,6 @@ kafka: 2. The value of a default environment variable named `APPLICATION_SERVER`. 3. `localhost`. -### Web Services - -Kstreamplify provides web services to query the state stores of your Kafka Streams application. -It handles state stores being on different Kafka Streams instances by providing an [RPC layer](https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html#adding-an-rpc-layer-to-your-application). - -Here is the list of supported state store types: -- Key-Value store -- Window store - -Only state stores with String keys are supported. - -You can use the [Swagger UI](#swagger) to see the available endpoints. - ### Services You can leverage the interactive queries services used by the web services layer to serve your own needs. @@ -391,17 +461,22 @@ public class MyService { } ``` -## Topology +## Hooks -Kstreamplify provides a web service to retrieve the Kafka Streams topology as JSON. +Kstreamplify offers the flexibility to execute custom code through hooks. -By default, the endpoint is available at `/topology`. +### On Start -The path can be customized by setting the following properties: +The `On Start` hook allows you to execute code before starting the Kafka Streams instance. -```yml -topology: - path: custom-topology +```java +@Component +public class MyKafkaStreams extends KafkaStreamsStarter { + @Override + public void onStart(KafkaStreams kafkaStreams) { + // Do something before starting the Kafka Streams instance + } +} ``` ## Deduplication