diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java index 89cf7aa0..3c1cc745 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java @@ -8,6 +8,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.common.KafkaException; import java.util.Map; @@ -41,7 +42,13 @@ public DeserializationHandlerResponse handle(ProcessorContext processorContext, .setPartition(consumerRecord.partition()) .setTopic(consumerRecord.topic()); - producer.send(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), consumerRecord.key(), builder.build())).get(); + boolean isCausedByKafka = consumptionException.getCause() instanceof KafkaException; + //If the cause of this exception is a KafkaException and if getCause == sourceException (see Throwable.getCause - including SerializationException) + //use to handle poison pill => sent message into dlq and continue our life. + if(isCausedByKafka || consumptionException.getCause() == null) { + producer.send(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), consumerRecord.key(), builder.build())).get(); + return DeserializationHandlerResponse.CONTINUE; + } } catch (InterruptedException ie) { log.error("Interruption while sending the deserialization exception {} for key {}, value {} and topic {} to DLQ topic {}", consumptionException, consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), KafkaStreamsExecutionContext.getDlqTopicName(), ie); @@ -49,10 +56,11 @@ public DeserializationHandlerResponse handle(ProcessorContext processorContext, } catch (Exception e) { log.error("Cannot send the deserialization exception {} for key {}, value {} and topic {} to DLQ topic {}", consumptionException, consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), KafkaStreamsExecutionContext.getDlqTopicName(), e); - return DeserializationHandlerResponse.FAIL; } - return DeserializationHandlerResponse.CONTINUE; + // here we only have exception like UnknownHostException for example or TimeoutException ... + // situation example: we cannot ask schema registry because the url is unavailable + return DeserializationHandlerResponse.FAIL; } /**