Kafka Konsumer provides an easy implementation of Kafka consumer with a built-in retry/exception manager (kafka-cronsumer).
- Added ability for manipulating kafka message headers.
- Added transactional retry feature. Set false if you want to use exception/retry strategy to only failed messages.
- Enable manuel commit at both single and batch consuming modes.
- Enabling consumer resume/pause functionality. Please refer to its example and how it works documentation.
- Bumped kafka-cronsumer to the latest version:
- Backoff strategy support (linear, exponential options)
- Added message key for retried messages
- Added x-error-message to see what was the error of the message during processing
- Reduce memory allocation.
- Increase TP on changing internal concurrency structure.
You can get latest version via go get github.com/Trendyol/kafka-konsumer/v2@latest
-
You need to change import path from
github.com/Trendyol/kafka-konsumer
togithub.com/Trendyol/kafka-konsumer/v2
-
You need to change your consume function with pointer signature.
-
We moved messageGroupDuration from
batchConfiguration.messageGroupDuration
to root level. Because this field is used single (non-batch) consumer too.
go get github.com/Trendyol/kafka-konsumer/v2@latest
You can find a number of ready-to-run examples at this directory.
After running docker-compose up
command, you can run any application you want.
Simple Consumer
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
ConsumeFn: consumeFn,
RetryEnabled: false,
}
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()
consumer.Consume()
}
func consumeFn(message kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
return nil
}
Simple Consumer With Retry/Exception Option
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
ConsumeFn: consumeFn,
}
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()
consumer.Consume()
}
func consumeFn(message kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
return nil
}
With Batch Option
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
LogLevel: kafka.LogLevelDebug,
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
MessageGroupDuration: time.Second,
BatchConfiguration: kafka.BatchConfiguration{
MessageGroupLimit: 1000,
BatchConsumeFn: batchConsumeFn,
},
}
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()
consumer.Consume()
}
func batchConsumeFn(messages []kafka.Message) error {
fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
return nil
}
With Disabling Transactional Retry
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
LogLevel: kafka.LogLevelDebug,
RetryEnabled: true,
TransactionalRetry: kafka.NewBoolPtr(false),
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
MessageGroupDuration: time.Second,
BatchConfiguration: kafka.BatchConfiguration{
MessageGroupLimit: 1000,
BatchConsumeFn: batchConsumeFn,
},
}
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()
consumer.Consume()
}
func batchConsumeFn(messages []kafka.Message) error {
// you can add custom error handling here & flag messages
for i := range messages {
if i%2 == 0 {
messages[i].IsFailed = true
}
}
// you must return err here to retry failed messages
return errors.New("err")
}
Please refer to Producer Interceptor Example
Please refer to Tracing Example
Please refer to Pause Resume Example
In this example, we are demonstrating how to create Grafana dashboard and how to define alerts in Prometheus. You can
see the example by going to the with-grafana folder in the examples folder
and running the infrastructure with docker compose up
and then the application.
Under the examples - with-sasl-plaintext folder, you can find an example
of a consumer integration with SASL/PLAIN mechanism. To try the example, you can run the command docker compose up
under the specified folder and then start the application.
config | description | default |
---|---|---|
reader |
Describes all segmentio kafka reader configurations | |
consumeFn |
Kafka consumer function, if retry enabled it, is also used to consume retriable messages | |
skipMessageByHeaderFn |
Function to filter messages based on headers, return true if you want to skip the message | nil |
logLevel |
Describes log level; valid options are debug , info , warn , and error |
info |
concurrency |
Number of goroutines used at listeners | 1 |
retryEnabled |
Retry/Exception consumer is working or not | false |
transactionalRetry |
Set false if you want to use exception/retry strategy to only failed messages | true |
commitInterval |
indicates the interval at which offsets are committed to the broker. | 1s |
verifyTopicOnStartup |
it checks existence of the given topic(s) on the kafka cluster. | false |
rack |
see doc | |
clientId |
see doc | |
messageGroupDuration |
Maximum time to wait for a batch | 1s |
metricPrefix |
MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_konsumer . Currently, there are two exposed prometheus metrics. processed_messages_total and unprocessed_messages_total So, if default metric prefix used, metrics names are kafka_konsumer_processed_messages_total_current and kafka_konsumer_unprocessed_messages_total_current . |
kafka_konsumer |
dial.Timeout |
see doc | no timeout |
dial.KeepAlive |
see doc | not enabled |
transport.DialTimeout |
[see doc]((https://pkg.go.dev/github.com/segmentio/[email protected]#Transport) | 5s |
transport.IdleTimeout |
[see doc]((https://pkg.go.dev/github.com/segmentio/[email protected]#Transport) | 30s |
transport.MetadataTTL |
[see doc]((https://pkg.go.dev/github.com/segmentio/[email protected]#Transport) | 6s |
transport.MetadataTopics |
[see doc]((https://pkg.go.dev/github.com/segmentio/[email protected]#Transport) | all topics in cluster |
distributedTracingEnabled |
indicates open telemetry support on/off for consume and produce operations. | false |
distributedTracingConfiguration.TracerProvider |
see doc | otel.GetTracerProvider() |
distributedTracingConfiguration.Propagator |
see doc | otel.GetTextMapPropagator() |
retryConfiguration.clientId |
[see doc]((https://pkg.go.dev/github.com/segmentio/[email protected]#Transport) | |
retryConfiguration.startTimeCron |
Cron expression when retry consumer (kafka-cronsumer) starts to work at | |
retryConfiguration.metricPrefix |
MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_cronsumer. Currently, there are two exposed prometheus metrics. retried_messages_total_current and discarded_messages_total_current. So, if default metric prefix used, metrics names are kafka_cronsumer_retried_messages_total_current and kafka_cronsumer_discarded_messages_total_current | kafka_cronsumer |
retryConfiguration.workDuration |
Work duration exception consumer actively consuming messages | |
retryConfiguration.queueCapacity |
see doc | 100 |
retryConfiguration.producerBatchSize |
see doc | 100 |
retryConfiguration.producerBatchTimeout |
see doc | 100 |
retryConfiguration.topic |
Retry/Exception topic names | |
retryConfiguration.brokers |
Retry topic brokers urls | |
retryConfiguration.maxRetry |
Maximum retry value for attempting to retry a message | 3 |
retryConfiguration.concurrency |
Number of goroutines used at listeners | 1 |
retryConfiguration.tls.rootCAPath |
see doc | "" |
retryConfiguration.tls.intermediateCAPath |
Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
retryConfiguration.sasl.authType |
SCRAM or PLAIN |
|
retryConfiguration.sasl.username |
SCRAM OR PLAIN username | |
retryConfiguration.sasl.password |
SCRAM OR PLAIN password | |
retryConfiguration.skipMessageByHeaderFn |
Function to filter messages based on headers, return true if you want to skip the message | nil |
retryConfiguration.verifyTopicOnStartup |
it checks existence of the given retry topic on the kafka cluster. | false |
batchConfiguration.messageGroupLimit |
Maximum number of messages in a batch | 100 |
batchConfiguration.messageGroupByteSizeLimit |
Maximum number of bytes in a batch | |
batchConfiguration.batchConsumeFn |
Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | |
batchConfiguration.preBatchFn |
This function enable for transforming messages before batch consuming starts | |
batchConfiguration.balancer |
see doc | leastBytes |
tls.rootCAPath |
see doc | "" |
tls.intermediateCAPath |
Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
sasl.authType |
SCRAM or PLAIN |
|
sasl.username |
SCRAM OR PLAIN username | |
sasl.password |
SCRAM OR PLAIN password | |
logger |
If you want to custom logger | info |
apiEnabled |
Enabled metrics | false |
apiConfiguration.port |
Set API port | 8090 |
apiConfiguration.healtCheckPath |
Set Health check path | healthcheck |
metricConfiguration.path |
Set metric endpoint path | /metrics |
Kafka Konsumer offers an API that handles exposing several metrics.
Metric Name | Description | Value Type |
---|---|---|
kafka_konsumer_processed_messages_total_current | Total number of processed messages. | Counter |
kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter |
kafka_konsumer_error_count_during_fetching_message_total_current | Total number of error during fetching message. | Counter |