Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

Consume customer events in statistics service. #7

Merged
merged 1 commit into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions statistics-service-provider/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package de.sample.schulung.statistics.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.kafka.support.JacksonUtils;
import org.springframework.kafka.support.serializer.JsonDeserializer;

public class CustomJsonDeserializer extends JsonDeserializer<Object> {

private static ObjectMapper createCustomObjectMapper() {
final var result = JacksonUtils.enhancedObjectMapper();
result.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
result.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
result.registerModule(new JavaTimeModule());
return result;
}

public CustomJsonDeserializer() {
super(createCustomObjectMapper());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package de.sample.schulung.statistics.kafka;

import de.sample.schulung.statistics.domain.Customer;
import de.sample.schulung.statistics.domain.CustomersService;
import jakarta.validation.ValidationException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@RequiredArgsConstructor
public class CustomerEventListener {

private final CustomersService customersService;

@KafkaListener(
topics = KafkaConstants.CUSTOMER_EVENTS_TOPIC
)
public void consume(
@Payload CustomerEventRecord record,
@Header(KafkaHeaders.RECEIVED_PARTITION) String partition,
@Header(KafkaHeaders.OFFSET) int offset
) {
log.info(
"Received record {} {} (Partition: {}, Offset: {})",
record.eventType(),
record.uuid(),
partition,
offset
);
if(record.eventType() == null) {
return;
}
switch (record.eventType()) {
case "created":
case "updated":
if("active".equals(record.customer().state())) {
var customer = Customer
.builder()
.uuid(record.uuid())
.dateOfBirth(record.customer().birthdate())
.build();
customersService.saveCustomer(customer);
} else {
// TODO wenn "created" / nicht "active" -> kein DB-Zugriff
customersService.deleteCustomer(record.uuid());
}
break;
case "deleted":
customersService.deleteCustomer(record.uuid());
break;
default:
throw new ValidationException();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package de.sample.schulung.statistics.kafka;

import java.util.UUID;

public record CustomerEventRecord(
String eventType,
UUID uuid,
CustomerRecord customer
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package de.sample.schulung.statistics.kafka;

import java.time.LocalDate;

public record CustomerRecord(
LocalDate birthdate,
String state
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package de.sample.schulung.statistics.kafka;

import lombok.experimental.UtilityClass;

@UtilityClass
public class KafkaConstants {

public final String CUSTOMER_EVENTS_TOPIC = "customer-events";

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ spring:
console:
path: /db
enabled: true
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
consumer:
value-deserializer: de.sample.schulung.statistics.kafka.CustomJsonDeserializer
properties:
"[spring.json.use.type.headers]": false
"[spring.json.value.default.type]": de.sample.schulung.statistics.kafka.CustomerEventRecord
group-id: customer-statistics
auto-offset-reset: earliest
application:
init-sample-data:
enabled: true
Loading