From 791a578b7a17c5d6927feb52fbb7b62bdc77a2a8 Mon Sep 17 00:00:00 2001
From: Ralf Ueberfuhr <ralf.ueberfuhr@ars.de>
Date: Fri, 28 Jun 2024 14:29:15 +0200
Subject: [PATCH] Use Retry

---
 .../statistics/kafka/CustomerEventListener.java   | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventListener.java b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventListener.java
index 3331763..dd30993 100644
--- a/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventListener.java
+++ b/statistics-service-provider/src/main/java/de/sample/schulung/statistics/kafka/CustomerEventListener.java
@@ -2,16 +2,21 @@
 
 import de.sample.schulung.statistics.domain.Customer;
 import de.sample.schulung.statistics.domain.CustomersService;
+import jakarta.validation.Valid;
 import jakarta.validation.ValidationException;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.annotation.RetryableTopic;
 import org.springframework.kafka.support.Acknowledgment;
 import org.springframework.kafka.support.KafkaHeaders;
 import org.springframework.messaging.handler.annotation.Header;
 import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.retry.annotation.Backoff;
 import org.springframework.stereotype.Component;
+import org.springframework.validation.annotation.Validated;
 
+@Validated
 @Component
 @Slf4j
 @RequiredArgsConstructor
@@ -19,11 +24,19 @@ public class CustomerEventListener {
 
   private final CustomersService customersService;
 
+  // Versuch, danach 1 Versuch nach 1s, danach Versuch nach 2s, danach Versuch nach 4s
+  @RetryableTopic(
+    attempts = "4",
+    backoff = @Backoff(
+      delay = 1000L,
+      multiplier = 2
+    )
+  )
   @KafkaListener(
     topics = KafkaConstants.CUSTOMER_EVENTS_TOPIC
   )
   public void consume(
-    @Payload CustomerEventRecord record,
+    @Valid @Payload CustomerEventRecord record,
     @Header(KafkaHeaders.RECEIVED_PARTITION) String partition,
     @Header(KafkaHeaders.OFFSET) int offset,
     Acknowledgment  acknowledgment