Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

Commit

Permalink
Use Retry
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralf Ueberfuhr committed Jun 28, 2024
1 parent e16f18d commit 791a578
Showing 1 changed file with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,41 @@

import de.sample.schulung.statistics.domain.Customer;
import de.sample.schulung.statistics.domain.CustomersService;
import jakarta.validation.Valid;
import jakarta.validation.ValidationException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;

@Validated
@Component
@Slf4j
@RequiredArgsConstructor
public class CustomerEventListener {

private final CustomersService customersService;

// Versuch, danach 1 Versuch nach 1s, danach Versuch nach 2s, danach Versuch nach 4s
@RetryableTopic(
attempts = "4",
backoff = @Backoff(
delay = 1000L,
multiplier = 2
)
)
@KafkaListener(
topics = KafkaConstants.CUSTOMER_EVENTS_TOPIC
)
public void consume(
@Payload CustomerEventRecord record,
@Valid @Payload CustomerEventRecord record,
@Header(KafkaHeaders.RECEIVED_PARTITION) String partition,
@Header(KafkaHeaders.OFFSET) int offset,
Acknowledgment acknowledgment
Expand Down

0 comments on commit 791a578

Please sign in to comment.