diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index 5d149d731e3..ab21762b275 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -60,6 +60,9 @@ static int process_message(struct flb_in_kafka_config *ctx, { struct flb_log_event_encoder *log_encoder = ctx->log_encoder; int ret; + struct flb_time out_time; + void *out_buf; + size_t out_size; ret = flb_log_event_encoder_begin_record(log_encoder); @@ -128,11 +131,51 @@ static int process_message(struct flb_in_kafka_config *ctx, if (ret == FLB_EVENT_ENCODER_SUCCESS) { if (rkm->payload) { - if (ctx->format != FLB_IN_KAFKA_FORMAT_JSON || + if (ctx->parser) { + /* Reset time for each line */ + flb_time_zero(&out_time); + + /* Use the defined parser */ + ret = flb_parser_do(ctx->parser, rkm->payload, rkm->len, + &out_buf, &out_size, &out_time); + + if (ret >= 0) { + if (flb_time_to_nanosec(&out_time) == 0L) { + flb_time_get(&out_time); + } + ret = flb_log_event_encoder_set_timestamp(log_encoder, &out_time); + + if (ret == FLB_EVENT_ENCODER_SUCCESS){ + ret = flb_log_event_encoder_append_body_raw_msgpack(log_encoder, + out_buf, + out_size); + } + + flb_free(out_buf); + } + else { + flb_plg_warn(ctx->ins, + "failed to parse payload, \ + fluentbit error code : %d, \ + return to default behaver\n", + ret); + + if (ctx->format != FLB_IN_KAFKA_FORMAT_JSON || try_json(log_encoder, rkm)) { - ret = flb_log_event_encoder_append_body_string(log_encoder, - rkm->payload, - rkm->len); + ret = flb_log_event_encoder_append_body_string(log_encoder, + rkm->payload, + rkm->len); + } + } + } + else{ + /* return to default behavior */ + if (ctx->format != FLB_IN_KAFKA_FORMAT_JSON || + try_json(log_encoder, rkm)) { + ret = flb_log_event_encoder_append_body_string(log_encoder, + rkm->payload, + rkm->len); + } } } else { @@ -164,6 +207,7 @@ static int in_kafka_collect(struct flb_input_instance *ins, rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1); if (!rkm) { + flb_plg_debug(ins, "no message polled, break collection loop"); break; } @@ -177,12 +221,34 @@ static int in_kafka_collect(struct flb_input_instance *ins, flb_plg_debug(ins, "kafka message received"); ret = process_message(ctx, rkm); - + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_debug(ins, + "parsed kafka message, \ + topic: %s, offset: %s, partition: %s\n", + rd_kafka_topic_name(rkm -> rkt), + rkm->offset, + rkm->partition); + if ( ctx -> auto_commit ) { + rd_kafka_offset_store_message(rkm); + } + else{ + rd_kafka_commit_message(ctx -> kafka.rk,rkm,0); + } + } + else{ + flb_plg_warn(ins, + "encode kafka message error, \ + topic: %s, offset: %s, partition: %s\n", + rd_kafka_topic_name(rkm -> rkt), + rkm->offset, + rkm->partition); + rd_kafka_message_destroy(rkm); + continue; + } + rd_kafka_message_destroy(rkm); - /* TO-DO: commit the record based on `ret` */ - rd_kafka_commit(ctx->kafka.rk, NULL, 0); - /* Break from the loop when reaching the limit of polling if available */ if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED && ctx->log_encoder->output_length > ctx->polling_threshold + 512) { @@ -237,6 +303,17 @@ static int in_kafka_init(struct flb_input_instance *ins, return -1; } + /* parser settings, need to set after flb_input_config_map_set call */ + if (ctx->parser_name) { + flb_plg_debug(ctx->ins, "request parser '%s'", ctx->parser_name); + ctx->parser = flb_parser_get(ctx->parser_name, config); + if (!ctx->parser) { + flb_plg_error(ctx->ins, "requested parser '%s' not found", + ctx->parser_name); + return -1; + } + } + kafka_conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 1); if (!kafka_conf) { flb_plg_error(ins, "Could not initialize kafka config object"); @@ -263,6 +340,25 @@ static int in_kafka_init(struct flb_input_instance *ins, rd_kafka_err2str(err), conf_val); goto init_error; } + + /* disable auto_offset_store, manully store/commit offset latter */ + res = rd_kafka_conf_set(kafka_conf, "enable.auto.offset.store", "false", + errstr, sizeof(errstr)); + if (res != RD_KAFKA_CONF_OK) { + flb_plg_error(ins, "Failed to set up enable.auto.offset.store: %s, val = %s", + rd_kafka_err2str(err), "false"); + goto init_error; + } + + if (!ctx ->auto_commit){ + res = rd_kafka_conf_set(kafka_conf, "enable.auto.commit", "false", + errstr, sizeof(errstr)); + if (res != RD_KAFKA_CONF_OK) { + flb_plg_error(ins, "Failed to set up enable.auto.commit : %s, val = %s", + rd_kafka_err2str(err), "false"); + goto init_error; + } + } } else { ctx->polling_threshold = FLB_IN_KAFKA_UNLIMITED; @@ -402,6 +498,12 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, format_str), "Set the data format which will be used for parsing records." }, + { + FLB_CONFIG_MAP_STR, "parser", (char *)NULL, + 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, parser_name), + "Set the data parser which will be used for \ + parsing kafka message payload." + }, { FLB_CONFIG_MAP_STR, "brokers", (char *)NULL, 0, FLB_FALSE, 0, @@ -428,6 +530,13 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size), "Set the maximum size of chunk" }, + { + FLB_CONFIG_MAP_BOOL, "auto_commit", false, + 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, auto_commit), + "Set if enable the auto_commit \ + Set it to true when the throughtput \ + is the game" + }, /* EOF */ {0} }; diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index b56d9c66893..1b82cbe63ed 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -48,6 +48,9 @@ struct flb_in_kafka_config { int coll_fd; size_t buffer_max_size; /* Maximum size of chunk allocation */ size_t polling_threshold; + flb_sds_t parser_name; /* Bame of the parser */ + struct flb_parser *parser; /* Parser */ + bool auto_commit; /* Auto commit switch */ }; #endif