Skip to content

Commit

Permalink
Add applicationId in KafkaError schema (#272)
Browse files Browse the repository at this point in the history
* Add applicationId to KafkaError avro schema

* Fill applicationId when building KafkaError in ExceptionHandler and GenericErrorProcessor

* In ProcessingErrorTest, add applicationId in KafkaError message

* In TopologyErrorHandlerTest, add assertions to check for KafkaError fields

* Remove usage of getFirst that is only available for Java21

---------

Co-authored-by: Bastien Bigué <[email protected]>
  • Loading branch information
BastienBigue and Bastien Bigué authored Nov 14, 2024
1 parent eacf6c5 commit 1332dd3
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,13 @@ void shouldSendExceptionToDlqWhenProcessingValueIsInvalid() {
stringInputTopic.pipeInput("key", "error");

var resultDlq = dlqTopic.readValuesToList();
var resultOutput = stringOutputTopic.readValuesToList();

assertEquals(1, resultDlq.size());
var record = resultDlq.get(0);
assertEquals(record.getApplicationId(), "test");
assertEquals(record.getTopic(), "stringTopic");
assertEquals(record.getValue(), "error");

var resultOutput = stringOutputTopic.readValuesToList();
assertEquals(0, resultOutput.size());
}

Expand Down
6 changes: 6 additions & 0 deletions kstreamplify-core/src/main/avro/kafka-error.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
"name": "topic",
"type": "string",
"doc": "Source topic of erroneous message"
},
{
"name": "applicationId",
"type": ["null", "string"],
"doc": "Application id of the application that produced the erroneous message",
"default": null
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public DeserializationHandlerResponse handle(ProcessorContext processorContext,
"An exception occurred during the stream internal deserialization")
.setOffset(consumerRecord.offset())
.setPartition(consumerRecord.partition())
.setTopic(consumerRecord.topic());
.setTopic(consumerRecord.topic())
.setApplicationId(processorContext.applicationId());

boolean isCausedByKafka = consumptionException.getCause() instanceof KafkaException;
// If the cause of this exception is a KafkaException and if getCause == sourceException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;

/**
Expand Down Expand Up @@ -58,7 +59,9 @@ public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]>
.setOffset(-1)
.setPartition(
producerRecord.partition() == null ? -1 : producerRecord.partition())
.setTopic(producerRecord.topic());
.setTopic(producerRecord.topic())
.setApplicationId(
KafkaStreamsExecutionContext.getProperties().getProperty(StreamsConfig.APPLICATION_ID_CONFIG));

producer.send(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(),
producerRecord.key(), builder.build())).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public void process(FixedKeyRecord<String, ProcessingError<V>> fixedKeyRecord) {
recordMetadata != null && recordMetadata.topic() != null ? recordMetadata.topic() :
"Outside topic context")
.setValue(fixedKeyRecord.value().getKafkaRecord())
.setApplicationId(context().applicationId())
.build();

context().forward(fixedKeyRecord.withValue(error));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void shouldCreateProcessingErrorFromAvroRecord() {
.setPartition(1)
.setTopic("Topic")
.setValue("Value")
.setApplicationId("ApplicationId")
.build();

ProcessingError<KafkaError> processingError = new ProcessingError<>(exception, contextMessage, kafkaRecord);
Expand All @@ -57,6 +58,7 @@ void shouldCreateProcessingErrorFromAvroRecord() {
"offset": 1,
"cause": "Cause",
"topic": "Topic",
"applicationId": "ApplicationId",
"value": "Value"
}""", processingError.getKafkaRecord());
}
Expand Down

0 comments on commit 1332dd3

Please sign in to comment.