diff --git a/README.md b/README.md index fd59205..ae336c2 100644 --- a/README.md +++ b/README.md @@ -510,7 +510,8 @@ You need to install rdkafka gem. partition_key_key (string) :default => 'partition_key' message_key_key (string) :default => 'message_key' default_topic (string) :default => nil - use_default_for_unknown_topic (bool) :default => false + use_default_for_unknown_topic (bool) :default => false + use_default_for_unknown_partition_error (bool) :default => false default_partition_key (string) :default => nil default_message_key (string) :default => nil exclude_topic_key (bool) :default => false diff --git a/lib/fluent/plugin/out_rdkafka2.rb b/lib/fluent/plugin/out_rdkafka2.rb index 8861202..044efe6 100644 --- a/lib/fluent/plugin/out_rdkafka2.rb +++ b/lib/fluent/plugin/out_rdkafka2.rb @@ -66,6 +66,7 @@ class Fluent::Rdkafka2Output < Output config_param :default_topic, :string, :default => nil, :desc => "Default output topic when record doesn't have topic field" config_param :use_default_for_unknown_topic, :bool, :default => false, :desc => "If true, default_topic is used when topic not found" + config_param :use_default_for_unknown_partition_error, :bool, :default => false, :desc => "If true, default_topic is used when received unknown_partition error" config_param :message_key_key, :string, :default => 'message_key', :desc => "Field for kafka message key" config_param :default_message_key, :string, :default => nil config_param :partition_key, :string, :default => 'partition', :desc => "Field for kafka partition" @@ -234,8 +235,8 @@ def add(level, message = nil) @rdkafka = Rdkafka::Config.new(config) if @default_topic.nil? - if @use_default_for_unknown_topic - raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic is true" + if @use_default_for_unknown_topic || @use_default_for_unknown_partition_error + raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic or use_default_for_unknown_partition_error is true" end if @chunk_keys.include?(@topic_key) && !@chunk_key_tag log.warn "Use '#{@topic_key}' field of event record for topic but no fallback. Recommend to set default_topic or set 'tag' in buffer chunk keys like " @@ -504,6 +505,16 @@ def enqueue_with_retry(producer, topic, record_buf, message_key, partition, head actual_topic = @default_topic retry end + raise e + # https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/src/rdkafka.h#L305 + # RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION + when :unknown_partition + if @use_default_for_unknown_partition_error && actual_topic != @default_topic + log.debug "failed writing to topic '#{actual_topic}' with error '#{e.to_s}'. Writing message to topic '#{@default_topic}'" + actual_topic = @default_topic + retry + end + raise e else raise e