diff --git a/README.md b/README.md index 7a3465e..814d40e 100644 --- a/README.md +++ b/README.md @@ -212,6 +212,7 @@ If `ruby-kafka` doesn't fit your kafka environment, check `rdkafka2` plugin inst discard_kafka_delivery_failed (bool) :default => false (No discard) partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32' share_producer (bool) :default => false + idempotent (bool) :default => false # If you intend to rely on AWS IAM auth to MSK with long lived credentials # https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html diff --git a/lib/fluent/plugin/out_rdkafka2.rb b/lib/fluent/plugin/out_rdkafka2.rb index 15da9a1..b306cfb 100644 --- a/lib/fluent/plugin/out_rdkafka2.rb +++ b/lib/fluent/plugin/out_rdkafka2.rb @@ -42,6 +42,7 @@ class Fluent::Rdkafka2Output < Output config_param :default_message_key, :string, :default => nil config_param :partition_key, :string, :default => 'partition', :desc => "Field for kafka partition" config_param :default_partition, :integer, :default => nil + config_param :idempotent, :bool, :default => false, :desc => 'Enable idempotent producer' config_param :output_data_type, :string, :default => 'json', :obsoleted => "Use section instead" config_param :output_include_tag, :bool, :default => false, :obsoleted => "Use section instead" config_param :output_include_time, :bool, :default => false, :obsoleted => "Use section instead" @@ -286,6 +287,7 @@ def build_config config[:"batch.num.messages"] = @rdkafka_message_max_num if @rdkafka_message_max_num config[:"sasl.username"] = @username if @username config[:"sasl.password"] = @password if @password + config[:"enable.idempotence"] = @idempotent if @idempotent @rdkafka_options.each { |k, v| config[k.to_sym] = v