Skip to content

Commit

Permalink
Add documentation about TopicWithSerde
Browse files Browse the repository at this point in the history
  • Loading branch information
loicgreffier committed Dec 11, 2024
1 parent c7d23d4 commit 352937e
Showing 1 changed file with 126 additions and 51 deletions.
177 changes: 126 additions & 51 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,18 @@ need to do:
* [Production and Deserialization Errors](#production-and-deserialization-errors)
* [Avro Schema](#avro-schema)
* [Uncaught Exception Handler](#uncaught-exception-handler)
* [Kubernetes](#kubernetes)
* [Hooks](#hooks)
* [On Start](#on-start)
* [Web Services](#web-services)
* [Topology](#topology)
* [State Stores](#state-stores)
* [Kubernetes](#kubernetes)
* [TopicWithSerde API](#topicwithserde-api)
* [Declaration](#declaration)
* [Prefix](#prefix)
* [Interactive Queries](#interactive-queries)
* [Application Server Configuration](#application-server-configuration)
* [Web Services](#web-services)
* [Configuration](#configuration)
* [Services](#services)
* [Topology](#topology)
* [Hooks](#hooks)
* [On Start](#on-start)
* [Deduplication](#deduplication)
* [By Key](#by-key)
* [By Key and Value](#by-key-and-value)
Expand Down Expand Up @@ -99,8 +103,7 @@ To include the core Kstreamplify library in your project, add the following depe

[![javadoc](https://javadoc.io/badge2/com.michelin/kstreamplify-spring-boot/javadoc.svg?style=for-the-badge&)](https://javadoc.io/doc/com.michelin/kstreamplify-spring-boot)

If you are using Spring Boot, you can integrate Kstreamplify with your Spring Boot application by adding the following
dependency:
For Spring Boot applications, use the following dependency:

```xml
<dependency>
Expand Down Expand Up @@ -134,8 +137,7 @@ initialization of Kafka Streams for you.

### Create your first Kstreamplify application

To create a Kstreamplify application, define a `KafkaStreamsStarter` bean within your Spring Boot context and
override the `KafkaStreamsStarter#topology()` method:
Define a `KafkaStreamsStarter` bean within your Spring Boot context and override the `KafkaStreamsStarter#topology()` method:

```java
@Component
Expand Down Expand Up @@ -169,7 +171,7 @@ kafka:
avro.remove.java.properties: true
```
Note that all the Kafka Streams properties have been moved under `kafka.properties`.
Note that all the Kafka Streams properties are prefixed with `kafka.properties`.

## Avro Serializer and Deserializer

Expand Down Expand Up @@ -223,10 +225,7 @@ public class MyKafkaStreams extends KafkaStreamsStarter {

### Processing Errors

Kstreamplify provides utilities to handle errors that occur during the processing of records and route them to a DLQ topic.

The processing result is encapsulated and marked as either success or failure.
Failed records will be routed to the DLQ topic, while successful records will still be up for further processing.
To handle processing errors and route them to the DLQ topic, you can use the `ProcessingResult` class.

```java
@Component
Expand Down Expand Up @@ -262,7 +261,7 @@ The map values processing returns a `ProcessingResult<V, V2>`, where:
- The first parameter is the type of the new value after a successful transformation.
- The second parameter is the type of the current value for which the transformation failed.

You can use the following to mark the result as successful:
Use the following to mark the result as successful:

```java
ProcessingResult.success(value);
Expand All @@ -274,13 +273,13 @@ Or the following in a catch clause to mark the result as failed:
ProcessingResult.fail(e, value, "Something bad happened...");
```

The stream of `ProcessingResult<V,V2>` needs to be lightened of the failed records by sending them to the DLQ topic.
This is done by invoking the `TopologyErrorHandler#catchErrors()` method.
Invoke the `TopologyErrorHandler#catchErrors()` by passing the stream to route the failed records to the DLQ topic.
A healthy stream is then returned and can be further processed.

### Production and Deserialization Errors

Kstreamplify provides production and deserialization handlers that send errors to the DLQ topic.
Production and deserialization handlers are provided to send errors to the DLQ topic.
Add the following properties to your `application.yml` file:

```yml
kafka:
Expand All @@ -296,9 +295,9 @@ available [here](https://github.com/michelin/kstreamplify/blob/main/kstreamplify

### Uncaught Exception Handler

Kstreamplify defines a default uncaught exception handler that catches all uncaught exceptions and shuts down the client.
The default uncaught exception handler catches all uncaught exceptions and shuts down the client.

If you want to override this behavior, you can override the `KafkaStreamsStarter#uncaughtExceptionHandler()` method and return your own
To change this behaviour, override the `KafkaStreamsStarter#uncaughtExceptionHandler()` method and return your own
uncaught exception handler.

```java
Expand All @@ -310,11 +309,38 @@ public StreamsUncaughtExceptionHandler uncaughtExceptionHandler() {
}
```

## Kubernetes
## Web Services

Kstreamplify provides web services on top of your Kafka Streams application.
They are available through the [Swagger UI](#swagger).

### Topology

The topology endpoint return the Kafka Streams topology description. It is available at `/topology` by default.

The path can be customized by setting the following property:

```yml
topology:
path: custom-topology
```

### State Stores

A list of endpoints to query the state stores of your Kafka Streams application is available.
It uses [interactive queries](https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html) and
handle state stores being on different Kafka Streams instances by providing an [RPC layer](https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html#adding-an-rpc-layer-to-your-application).

Here is the list of supported state store types:
- Key-Value store
- Window store

Only state stores with String keys are supported.

Kstreamplify provides readiness and liveness probes for Kubernetes deployment based on the Kafka Streams state.
### Kubernetes

By default, the endpoints are available at `/ready` and `/liveness`.
Readiness and liveness endpoints provide probes for Kubernetes deployment based on the Kafka Streams state.
They are available at `/ready` and `/liveness` by default.

The path can be customized by setting the following properties:

Expand All @@ -326,29 +352,86 @@ kubernetes:
path: custom-liveness
```

## Hooks
## TopicWithSerde API

Kstreamplify offers the flexibility to execute custom code through hooks.
Kstreamplify provides an API called `TopicWithSerde` that unifies all the consumption and production points and
deals with topics being owned by different teams across different environments.

### On Start
### Declaration

The `On Start` hook allows you to execute code before starting the Kafka Streams instance.
Declare your consumption and production points in a separate class.
It requires a topic name, a key SerDe, and a value SerDe.

```java
public static TopicWithSerde<String, KafkaPerson> inputTopic() {
return new TopicWithSerde<>(
"INPUT_TOPIC",
Serdes.String(),
SerdesUtils.getValueSerdes()
);
}
public static TopicWithSerde<String, KafkaPerson> outputTopic() {
return new TopicWithSerde<>(
"OUTPUT_TOPIC",
Serdes.String(),
SerdesUtils.getValueSerdes()
);
}
```

Use it in your topology:

```java
@Slf4j
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void onStart(KafkaStreams kafkaStreams) {
// Do something before starting the Kafka Streams instance
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaPerson> stream = inputTopic().stream(streamsBuilder);
outputTopic().produce(stream);
}
}
```

### Prefix

The `TopicWithSerde` API power is to handle topics owned by different teams across different environments without altering the topology.
It uses prefixes to differentiate teams and topic ownership.

In the `application.yml` file, declare the prefixes in a `key: value` format:

```yml
kafka:
properties:
prefix:
self: staging.team1.
team2: staging.team2.
team3: staging.team3.
```

Include the prefix `TopicWithSerde` declaration:

```java
public static TopicWithSerde<String, KafkaPerson> inputTopic() {
return new TopicWithSerde<>(
"INPUT_TOPIC",
"team1",
Serdes.String(),
SerdesUtils.getValueSerdes()
);
}
```

The topic `staging.team1.INPUT_TOPIC` will be consumed when running the application with the staging `application.yml` file.

When not specifying a prefix, the prefix `self` is used by default.

## Interactive Queries

Kstreamplify wants to ease the use of [interactive queries](https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html) in Kafka Streams application.

### Application Server Configuration
### Configuration

The "[application.server](https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html#application-server)" property value is determined from different sources by the following order of priority:

Expand All @@ -363,19 +446,6 @@ kafka:
2. The value of a default environment variable named `APPLICATION_SERVER`.
3. `localhost`.

### Web Services

Kstreamplify provides web services to query the state stores of your Kafka Streams application.
It handles state stores being on different Kafka Streams instances by providing an [RPC layer](https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html#adding-an-rpc-layer-to-your-application).

Here is the list of supported state store types:
- Key-Value store
- Window store

Only state stores with String keys are supported.

You can use the [Swagger UI](#swagger) to see the available endpoints.

### Services

You can leverage the interactive queries services used by the web services layer to serve your own needs.
Expand All @@ -391,17 +461,22 @@ public class MyService {
}
```

## Topology
## Hooks

Kstreamplify provides a web service to retrieve the Kafka Streams topology as JSON.
Kstreamplify offers the flexibility to execute custom code through hooks.

By default, the endpoint is available at `/topology`.
### On Start

The path can be customized by setting the following properties:
The `On Start` hook allows you to execute code before starting the Kafka Streams instance.

```yml
topology:
path: custom-topology
```java
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void onStart(KafkaStreams kafkaStreams) {
// Do something before starting the Kafka Streams instance
}
}
```

## Deduplication
Expand Down

0 comments on commit 352937e

Please sign in to comment.