Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_kafka: add parser support and improve performance #9726

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 117 additions & 8 deletions plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ static int process_message(struct flb_in_kafka_config *ctx,
{
struct flb_log_event_encoder *log_encoder = ctx->log_encoder;
int ret;
struct flb_time out_time;
void *out_buf;
size_t out_size;

ret = flb_log_event_encoder_begin_record(log_encoder);

Expand Down Expand Up @@ -128,11 +131,51 @@ static int process_message(struct flb_in_kafka_config *ctx,

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
if (rkm->payload) {
if (ctx->format != FLB_IN_KAFKA_FORMAT_JSON ||
if (ctx->parser) {
/* Reset time for each line */
flb_time_zero(&out_time);

/* Use the defined parser */
ret = flb_parser_do(ctx->parser, rkm->payload, rkm->len,
&out_buf, &out_size, &out_time);

if (ret >= 0) {
if (flb_time_to_nanosec(&out_time) == 0L) {
flb_time_get(&out_time);
}
ret = flb_log_event_encoder_set_timestamp(log_encoder, &out_time);

if (ret == FLB_EVENT_ENCODER_SUCCESS){
ret = flb_log_event_encoder_append_body_raw_msgpack(log_encoder,
out_buf,
out_size);
}

flb_free(out_buf);
}
else {
flb_plg_warn(ctx->ins,
"failed to parse payload, \
fluentbit error code : %d, \
return to default behaver\n",
ret);

if (ctx->format != FLB_IN_KAFKA_FORMAT_JSON ||
try_json(log_encoder, rkm)) {
ret = flb_log_event_encoder_append_body_string(log_encoder,
rkm->payload,
rkm->len);
ret = flb_log_event_encoder_append_body_string(log_encoder,
rkm->payload,
rkm->len);
}
}
}
else{
/* return to default behavior */
if (ctx->format != FLB_IN_KAFKA_FORMAT_JSON ||
try_json(log_encoder, rkm)) {
ret = flb_log_event_encoder_append_body_string(log_encoder,
rkm->payload,
rkm->len);
}
}
}
else {
Expand Down Expand Up @@ -164,6 +207,7 @@ static int in_kafka_collect(struct flb_input_instance *ins,
rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1);

if (!rkm) {
flb_plg_debug(ins, "no message polled, break collection loop");
break;
}

Expand All @@ -177,12 +221,34 @@ static int in_kafka_collect(struct flb_input_instance *ins,
flb_plg_debug(ins, "kafka message received");

ret = process_message(ctx, rkm);


if (ret == FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_debug(ins,
"parsed kafka message, \
topic: %s, offset: %s, partition: %s\n",
rd_kafka_topic_name(rkm -> rkt),
rkm->offset,
rkm->partition);
if ( ctx -> auto_commit ) {
rd_kafka_offset_store_message(rkm);
}
else{
rd_kafka_commit_message(ctx -> kafka.rk,rkm,0);
}
}
else{
flb_plg_warn(ins,
"encode kafka message error, \
topic: %s, offset: %s, partition: %s\n",
rd_kafka_topic_name(rkm -> rkt),
rkm->offset,
rkm->partition);
rd_kafka_message_destroy(rkm);
continue;
}

rd_kafka_message_destroy(rkm);

/* TO-DO: commit the record based on `ret` */
rd_kafka_commit(ctx->kafka.rk, NULL, 0);

/* Break from the loop when reaching the limit of polling if available */
if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED &&
ctx->log_encoder->output_length > ctx->polling_threshold + 512) {
Expand Down Expand Up @@ -237,6 +303,17 @@ static int in_kafka_init(struct flb_input_instance *ins,
return -1;
}

/* parser settings, need to set after flb_input_config_map_set call */
if (ctx->parser_name) {
flb_plg_debug(ctx->ins, "request parser '%s'", ctx->parser_name);
ctx->parser = flb_parser_get(ctx->parser_name, config);
if (!ctx->parser) {
flb_plg_error(ctx->ins, "requested parser '%s' not found",
ctx->parser_name);
return -1;
}
}

kafka_conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 1);
if (!kafka_conf) {
flb_plg_error(ins, "Could not initialize kafka config object");
Expand All @@ -263,6 +340,25 @@ static int in_kafka_init(struct flb_input_instance *ins,
rd_kafka_err2str(err), conf_val);
goto init_error;
}

/* disable auto_offset_store, manully store/commit offset latter */
res = rd_kafka_conf_set(kafka_conf, "enable.auto.offset.store", "false",
errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
flb_plg_error(ins, "Failed to set up enable.auto.offset.store: %s, val = %s",
rd_kafka_err2str(err), "false");
goto init_error;
}

if (!ctx ->auto_commit){
res = rd_kafka_conf_set(kafka_conf, "enable.auto.commit", "false",
errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
flb_plg_error(ins, "Failed to set up enable.auto.commit : %s, val = %s",
rd_kafka_err2str(err), "false");
goto init_error;
}
}
}
else {
ctx->polling_threshold = FLB_IN_KAFKA_UNLIMITED;
Expand Down Expand Up @@ -402,6 +498,12 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, format_str),
"Set the data format which will be used for parsing records."
},
{
FLB_CONFIG_MAP_STR, "parser", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, parser_name),
"Set the data parser which will be used for \
parsing kafka message payload."
},
{
FLB_CONFIG_MAP_STR, "brokers", (char *)NULL,
0, FLB_FALSE, 0,
Expand All @@ -428,6 +530,13 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size),
"Set the maximum size of chunk"
},
{
FLB_CONFIG_MAP_BOOL, "auto_commit", false,
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, auto_commit),
"Set if enable the auto_commit \
Set it to true when the throughtput \
is the game"
},
/* EOF */
{0}
};
Expand Down
3 changes: 3 additions & 0 deletions plugins/in_kafka/in_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ struct flb_in_kafka_config {
int coll_fd;
size_t buffer_max_size; /* Maximum size of chunk allocation */
size_t polling_threshold;
flb_sds_t parser_name; /* Bame of the parser */
struct flb_parser *parser; /* Parser */
bool auto_commit; /* Auto commit switch */
};

#endif