-
Notifications
You must be signed in to change notification settings - Fork 32
KafkaProducerErrorHandling
For each input tuple one message is sent to Kafka for each configured topic using a standard consumer client api.
During the process
method the message is handed to the Kafka client, which is an asynchronous operation returning a Future
. Tuple processing then continues with the operator returning from the process method.
The message is not delivered to Kafka until sometime later, and the Future
can be used to determine when it has been received at the server (subject to the ack
setting).
For consistent region the operator must ensure that at-least once processing is achieved, by ensuring that every message sent before the drain
has been received at the server.
The send message can fail with these exceptions:
-
InterruptException
- If the thread is interrupted while blocked -
SerializationException
- If the key or value are not valid objects given the configured serializers -
TimeoutException
- If the time taken for fetching metadata or allocating memory for the record has surpassed max.block.ms. -
KafkaException
- If a Kafka related error occurs that does not belong to the public API exceptions
These will be handled as:
-
InterruptException
- Rethrow error (or don't catch it) - operator is being shutdown. -
SerializationException
- Rethrow error (or don't catch it) - Internal/application/data(?) error. Possible data error, for example anrstring
with invalid UTF-8? Need to investigate. -
TimeoutException
- TBD -
KafkaException
- TBD
TODO
-
Can there be a
ConnectException
, which is a sub-class ofKafkaException
, if so we might want to do the logic that tries the reconnect? --There is no connection exception. The KafkaClient doesn't tell you if you are connected or not, the only way to find out (that I have found) is to use the producer.metrics() method and examine the connection attribute. The KafkaClient handles connection/reconnection. If we lose connection to the broker, Kafka will reconnect when the broker is available again. -
When in an autonomous region and a tuple is sent to multiple topics, if we retry in the process method, do we need to keep track of which topics were successfully processed?
Note throwing an exception from the process method will result in that tuple being lost:
- consistent region - region will fail and tuples will be replayed from source. When a single tuple is sent to multiple topics, a message might have been sent to one or more of the topics before the failure.
- autonomous region - PE will fail and restart, no message has been sent for a single topic, when a single tuple is sent to multiple topics, a message might have been sent to one or more of the topics before the failure.
Each message publish provides a callback method that is executed when the message has been sent to the kafka server or an exception was thrown. https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/Callback.html#onCompletion(org.apache.kafka.clients.producer.RecordMetadata,%20java.lang.Exception)
- Non-retryable exceptions - Message has been lost.
- Retryable exceptions - TBD - Not clear how this works, is the producer client doing the retry, so the callback might be called multiple times for the same message?
If a required Future fails then the message is lost and the exception should be thrown. Cleanup?
- consistent region - Any failure to send any message needs to cause the region to reset, e.g. by failing the operator by throwing an exception
- autonomous region - Most likely the same, there should be a failure to indicate that the final marker did not successfully process all the tuples. Throwing an exception from drain will result in the processPunctuation method throwing the exception.
- TBD
- TBD Describe behaviour
- Effect on state of operator, e.g. list of Futures if we have them