Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

Commit

Permalink
Consume customer events in statistics service.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralf Ueberfuhr committed Jun 28, 2024
1 parent f548945 commit 63a8cc4
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 0 deletions.
4 changes: 4 additions & 0 deletions statistics-service-provider/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package de.sample.schulung.statistics.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.kafka.support.JacksonUtils;
import org.springframework.kafka.support.serializer.JsonDeserializer;

public class CustomJsonDeserializer extends JsonDeserializer<Object> {

private static ObjectMapper createCustomObjectMapper() {
final var result = JacksonUtils.enhancedObjectMapper();
result.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
result.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
result.registerModule(new JavaTimeModule());
return result;
}

public CustomJsonDeserializer() {
super(createCustomObjectMapper());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package de.sample.schulung.statistics.kafka;

import de.sample.schulung.statistics.domain.Customer;
import de.sample.schulung.statistics.domain.CustomersService;
import jakarta.validation.ValidationException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@RequiredArgsConstructor
public class CustomerEventListener {

private final CustomersService customersService;

@KafkaListener(
topics = KafkaConstants.CUSTOMER_EVENTS_TOPIC
)
public void consume(
@Payload CustomerEventRecord record,
@Header(KafkaHeaders.RECEIVED_PARTITION) String partition,
@Header(KafkaHeaders.OFFSET) int offset
) {
log.info(
"Received record {} {} (Partition: {}, Offset: {})",
record.eventType(),
record.uuid(),
partition,
offset
);
if(record.eventType() == null) {
return;
}
switch (record.eventType()) {
case "created":
case "updated":
if("active".equals(record.customer().state())) {
var customer = Customer
.builder()
.uuid(record.uuid())
.dateOfBirth(record.customer().birthdate())
.build();
customersService.saveCustomer(customer);
} else {
// TODO wenn "created" / nicht "active" -> kein DB-Zugriff
customersService.deleteCustomer(record.uuid());
}
break;
case "deleted":
customersService.deleteCustomer(record.uuid());
break;
default:
throw new ValidationException();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package de.sample.schulung.statistics.kafka;

import java.util.UUID;

public record CustomerEventRecord(
String eventType,
UUID uuid,
CustomerRecord customer
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package de.sample.schulung.statistics.kafka;

import java.time.LocalDate;

public record CustomerRecord(
LocalDate birthdate,
String state
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package de.sample.schulung.statistics.kafka;

import lombok.experimental.UtilityClass;

@UtilityClass
public class KafkaConstants {

public final String CUSTOMER_EVENTS_TOPIC = "customer-events";

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ spring:
console:
path: /db
enabled: true
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
consumer:
value-deserializer: de.sample.schulung.statistics.kafka.CustomJsonDeserializer
properties:
"[spring.json.use.type.headers]": false
"[spring.json.value.default.type]": de.sample.schulung.statistics.kafka.CustomerEventRecord
group-id: customer-statistics
auto-offset-reset: earliest
application:
init-sample-data:
enabled: true

0 comments on commit 63a8cc4

Please sign in to comment.