diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java index faeb7023..89cf7aa0 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java @@ -37,9 +37,9 @@ public DeserializationHandlerResponse handle(ProcessorContext processorContext, var builder = KafkaError.newBuilder(); enrichWithException(builder, consumptionException, consumerRecord.key(), consumerRecord.value()) .setContextMessage("An exception occurred during the stream internal deserialization") - .setOffset(processorContext.offset()) - .setPartition(processorContext.partition()) - .setTopic(processorContext.topic()); + .setOffset(consumerRecord.offset()) + .setPartition(consumerRecord.partition()) + .setTopic(consumerRecord.topic()); producer.send(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), consumerRecord.key(), builder.build())).get(); } catch (InterruptedException ie) { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java index c70a35fb..c16bbcd6 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarter.java @@ -3,8 +3,6 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; -import static org.apache.commons.lang3.StringUtils.EMPTY; - /** * The Kafka Streams starter interface */