From 63a8cc4230f1cdf0ce54ebf6b82dbff9023a2302 Mon Sep 17 00:00:00 2001 From: Ralf Ueberfuhr Date: Fri, 28 Jun 2024 11:48:27 +0200 Subject: [PATCH] Consume customer events in statistics service. --- statistics-service-provider/pom.xml | 4 ++ .../kafka/CustomJsonDeserializer.java | 23 +++++++ .../kafka/CustomerEventListener.java | 62 +++++++++++++++++++ .../statistics/kafka/CustomerEventRecord.java | 10 +++ .../statistics/kafka/CustomerRecord.java | 9 +++ .../statistics/kafka/KafkaConstants.java | 10 +++ .../src/main/resources/application.yml | 9 +++ 7 files changed, 127 insertions(+) create mode 100644 statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomJsonDeserializer.java create mode 100644 statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventListener.java create mode 100644 statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventRecord.java create mode 100644 statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerRecord.java create mode 100644 statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/KafkaConstants.java diff --git a/statistics-service-provider/pom.xml b/statistics-service-provider/pom.xml index 05b21c1..5ce06ab 100644 --- a/statistics-service-provider/pom.xml +++ b/statistics-service-provider/pom.xml @@ -46,6 +46,10 @@ org.springframework.boot spring-boot-starter-data-jpa + + org.springframework.kafka + spring-kafka + com.h2database h2 diff --git a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomJsonDeserializer.java b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomJsonDeserializer.java new file mode 100644 index 0000000..5c957a2 --- /dev/null +++ b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomJsonDeserializer.java @@ -0,0 +1,23 @@ +package de.sample.schulung.statistics.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.springframework.kafka.support.JacksonUtils; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +public class CustomJsonDeserializer extends JsonDeserializer { + + private static ObjectMapper createCustomObjectMapper() { + final var result = JacksonUtils.enhancedObjectMapper(); + result.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); + result.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + result.registerModule(new JavaTimeModule()); + return result; + } + + public CustomJsonDeserializer() { + super(createCustomObjectMapper()); + } +} \ No newline at end of file diff --git a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventListener.java b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventListener.java new file mode 100644 index 0000000..ba3e3d0 --- /dev/null +++ b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventListener.java @@ -0,0 +1,62 @@ +package de.sample.schulung.statistics.kafka; + +import de.sample.schulung.statistics.domain.Customer; +import de.sample.schulung.statistics.domain.CustomersService; +import jakarta.validation.ValidationException; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +@RequiredArgsConstructor +public class CustomerEventListener { + + private final CustomersService customersService; + + @KafkaListener( + topics = KafkaConstants.CUSTOMER_EVENTS_TOPIC + ) + public void consume( + @Payload CustomerEventRecord record, + @Header(KafkaHeaders.RECEIVED_PARTITION) String partition, + @Header(KafkaHeaders.OFFSET) int offset + ) { + log.info( + "Received record {} {} (Partition: {}, Offset: {})", + record.eventType(), + record.uuid(), + partition, + offset + ); + if(record.eventType() == null) { + return; + } + switch (record.eventType()) { + case "created": + case "updated": + if("active".equals(record.customer().state())) { + var customer = Customer + .builder() + .uuid(record.uuid()) + .dateOfBirth(record.customer().birthdate()) + .build(); + customersService.saveCustomer(customer); + } else { + // TODO wenn "created" / nicht "active" -> kein DB-Zugriff + customersService.deleteCustomer(record.uuid()); + } + break; + case "deleted": + customersService.deleteCustomer(record.uuid()); + break; + default: + throw new ValidationException(); + } + } + +} diff --git a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventRecord.java b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventRecord.java new file mode 100644 index 0000000..66ea608 --- /dev/null +++ b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventRecord.java @@ -0,0 +1,10 @@ +package de.sample.schulung.statistics.kafka; + +import java.util.UUID; + +public record CustomerEventRecord( + String eventType, + UUID uuid, + CustomerRecord customer +) { +} diff --git a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerRecord.java b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerRecord.java new file mode 100644 index 0000000..cb88e83 --- /dev/null +++ b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerRecord.java @@ -0,0 +1,9 @@ +package de.sample.schulung.statistics.kafka; + +import java.time.LocalDate; + +public record CustomerRecord( + LocalDate birthdate, + String state +) { +} diff --git a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/KafkaConstants.java b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/KafkaConstants.java new file mode 100644 index 0000000..d4958b0 --- /dev/null +++ b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/KafkaConstants.java @@ -0,0 +1,10 @@ +package de.sample.schulung.statistics.kafka; + +import lombok.experimental.UtilityClass; + +@UtilityClass +public class KafkaConstants { + + public final String CUSTOMER_EVENTS_TOPIC = "customer-events"; + +} diff --git a/statistics-service-provider/src/main/resources/application.yml b/statistics-service-provider/src/main/resources/application.yml index b657716..b9a76f2 100644 --- a/statistics-service-provider/src/main/resources/application.yml +++ b/statistics-service-provider/src/main/resources/application.yml @@ -16,6 +16,15 @@ spring: console: path: /db enabled: true + kafka: + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} + consumer: + value-deserializer: de.sample.schulung.statistics.kafka.CustomJsonDeserializer + properties: + "[spring.json.use.type.headers]": false + "[spring.json.value.default.type]": de.sample.schulung.statistics.kafka.CustomerEventRecord + group-id: customer-statistics + auto-offset-reset: earliest application: init-sample-data: enabled: true \ No newline at end of file