From f4df47b020fdda1ea3ca06f19aa2799efb9fdea5 Mon Sep 17 00:00:00 2001 From: Ralf Ueberfuhr Date: Fri, 28 Jun 2024 23:29:11 +0200 Subject: [PATCH] Fix DLT configuration. --- .../kafka/CustomJsonSerializer.java | 23 +++++++++++++++++++ .../statistics/kafka/CustomerEventRecord.java | 8 +++++++ .../statistics/kafka/CustomerRecord.java | 6 +++++ .../src/main/resources/application.yml | 8 +++++++ 4 files changed, 45 insertions(+) create mode 100644 statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomJsonSerializer.java diff --git a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomJsonSerializer.java b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomJsonSerializer.java new file mode 100644 index 0000000..d24c210 --- /dev/null +++ b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomJsonSerializer.java @@ -0,0 +1,23 @@ +package de.sample.schulung.statistics.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.springframework.kafka.support.JacksonUtils; +import org.springframework.kafka.support.serializer.JsonSerializer; + +public class CustomJsonSerializer extends JsonSerializer { + + private static ObjectMapper createCustomObjectMapper() { + final var result = JacksonUtils.enhancedObjectMapper(); + result.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); + result.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + result.registerModule(new JavaTimeModule()); + return result; + } + + public CustomJsonSerializer() { + super(createCustomObjectMapper()); + } +} \ No newline at end of file diff --git a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventRecord.java b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventRecord.java index 66ea608..33be3b2 100644 --- a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventRecord.java +++ b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventRecord.java @@ -1,10 +1,18 @@ package de.sample.schulung.statistics.kafka; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Pattern; + import java.util.UUID; public record CustomerEventRecord( + @NotNull + @Pattern(regexp = "created|replaced|deleted") String eventType, + @NotNull UUID uuid, + @Valid CustomerRecord customer ) { } diff --git a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerRecord.java b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerRecord.java index cb88e83..6b36636 100644 --- a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerRecord.java +++ b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerRecord.java @@ -1,9 +1,15 @@ package de.sample.schulung.statistics.kafka; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Pattern; + import java.time.LocalDate; public record CustomerRecord( + @NotNull LocalDate birthdate, + @NotNull + @Pattern(regexp = "active|locked|disabled") String state ) { } diff --git a/statistics-service-provider/src/main/resources/application.yml b/statistics-service-provider/src/main/resources/application.yml index b6eb66a..9186474 100644 --- a/statistics-service-provider/src/main/resources/application.yml +++ b/statistics-service-provider/src/main/resources/application.yml @@ -18,6 +18,14 @@ spring: enabled: true kafka: bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} + producer: #DTL + # key-serializer: org.apache.kafka.common.serialization.UUIDSerializer + key-serializer: org.apache.kafka.common.serialization.StringSerializer + # https://docs.spring.io/spring-kafka/reference/kafka/serdes.html#json-serde (Jackson) + value-serializer: de.sample.schulung.statistics.kafka.CustomJsonSerializer + properties: + # do not serialize the class name into the message + "[spring.json.add.type.headers]": false consumer: value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer #de.sample.schulung.statistics.kafka.CustomJsonDeserializer