Skip to content

KafkaProducerErrorHandling

Alex-Cook4 edited this page Dec 2, 2016 · 5 revisions

KafkaProducer error handling PROPOSAL

KafkaProducer tuple processing

For each input tuple one message is sent to Kafka for each configured topic using a standard consumer client api. During the process method the message is handed to the Kafka client, which is an asynchronous operation returning a Future. Tuple processing then continues with the operator returning from the process method.

The message is not delivered to Kafka until sometime later, and the Future can be used to determine when it has been received at the server (subject to the ack setting).

For consistent region the operator must ensure that at-least once processing is achieved, by ensuring that every message sent before the drain has been received at the server.

Error handling during tuple processing

The send message can fail with these exceptions:

  • InterruptException - If the thread is interrupted while blocked
  • SerializationException - If the key or value are not valid objects given the configured serializers
  • TimeoutException - If the time taken for fetching metadata or allocating memory for the record has surpassed max.block.ms.
  • KafkaException - If a Kafka related error occurs that does not belong to the public API exceptions

https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

These will be handled as:

  • InterruptException - Rethrow error (or don't catch it) - operator is being shutdown.
  • SerializationException - Rethrow error (or don't catch it) - Internal/application/data(?) error. Possible data error, for example an rstring with invalid UTF-8? Need to investigate.
  • TimeoutException - TBD
  • KafkaException - TBD

TODO

  • Can there be a ConnectException, which is a sub-class of KafkaException, if so we might want to do the logic that tries the reconnect? --There is no connection exception. The KafkaClient doesn't tell you if you are connected or not, the only way to find out (that I have found) is to use the producer.metrics() method and examine the connection attribute. The KafkaClient handles connection/reconnection. If we lose connection to the broker, Kafka will reconnect when the broker is available again.

  • When in an autonomous region and a tuple is sent to multiple topics, if we retry in the process method, do we need to keep track of which topics were successfully processed?

Note throwing an exception from the process method will result in that tuple being lost:

  • consistent region - region will fail and tuples will be replayed from source. When a single tuple is sent to multiple topics, a message might have been sent to one or more of the topics before the failure.
  • autonomous region - PE will fail and restart, no message has been sent for a single topic, when a single tuple is sent to multiple topics, a message might have been sent to one or more of the topics before the failure.

Failure in the callback

Each message publish provides a callback method that is executed when the message has been sent to the kafka server or an exception was thrown. https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/Callback.html#onCompletion(org.apache.kafka.clients.producer.RecordMetadata,%20java.lang.Exception)

  • Non-retryable exceptions - Message has been lost.
  • Retryable exceptions - TBD - Not clear how this works, is the producer client doing the retry, so the callback might be called multiple times for the same message?

Failure when draining

If a required Future fails then the message is lost and the exception should be thrown. Cleanup?

  • consistent region - Any failure to send any message needs to cause the region to reset, e.g. by failing the operator by throwing an exception
  • autonomous region - Most likely the same, there should be a failure to indicate that the final marker did not successfully process all the tuples. Throwing an exception from drain will result in the processPunctuation method throwing the exception.

Failure when reseting

  • TBD

Interaction with @catch.

  • TBD Describe behaviour
  • Effect on state of operator, e.g. list of Futures if we have them