Skip to content

Commit

Permalink
in_kafka: Handle limit of buffer for fs chunks with buffer_chunk_size…
Browse files Browse the repository at this point in the history
… config parameter

Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Nov 15, 2023
1 parent 7df6cdb commit e3add26
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
27 changes: 21 additions & 6 deletions plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ static int in_kafka_collect(struct flb_input_instance *ins,
/* TO-DO: commit the record based on `ret` */
rd_kafka_commit(ctx->kafka.rk, NULL, 0);

/* Break from the loop when reaching the buf_limit if available */
if (ctx->ins->mem_buf_limit > 0 &&
ctx->log_encoder->output_length > ctx->ins->mem_buf_limit + 512) {
/* Break from the loop when reaching the limit of polling if available */
if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED &&
ctx->log_encoder->output_length > ctx->polling_threshold + 512) {
break;
}
}
Expand Down Expand Up @@ -241,8 +241,15 @@ static int in_kafka_init(struct flb_input_instance *ins,
goto init_error;
}

if (ctx->ins->mem_buf_limit > 0) {
snprintf(conf_val, sizeof(conf_val), "%zu", ctx->ins->mem_buf_limit - 512);
if (ctx->ins->mem_buf_limit > 0 || ctx->buffer_chunk_size > 0) {
if (ctx->ins->mem_buf_limit) {
ctx->polling_threshold = ctx->ins->mem_buf_limit;
}
else if (ctx->buffer_chunk_size > 0) {
ctx->polling_threshold = ctx->buffer_chunk_size;
}

snprintf(conf_val, sizeof(conf_val), "%zu", ctx->polling_threshold - 512);
res = rd_kafka_conf_set(kafka_conf, "fetch.max.bytes", conf_val,
errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
Expand All @@ -251,7 +258,7 @@ static int in_kafka_init(struct flb_input_instance *ins,
goto init_error;
}

snprintf(conf_val, sizeof(conf_val), "%zu", ctx->ins->mem_buf_limit);
snprintf(conf_val, sizeof(conf_val), "%zu", ctx->polling_threshold);
res = rd_kafka_conf_set(kafka_conf, "receive.message.max.bytes", conf_val,
errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
Expand All @@ -260,6 +267,9 @@ static int in_kafka_init(struct flb_input_instance *ins,
goto init_error;
}
}
else {
ctx->polling_threshold = FLB_IN_KAFKA_UNLIMITED;
}

ctx->kafka.rk = rd_kafka_new(RD_KAFKA_CONSUMER, kafka_conf, errstr,
sizeof(errstr));
Expand Down Expand Up @@ -416,6 +426,11 @@ static struct flb_config_map config_map[] = {
0, FLB_FALSE, 0,
"Set the librdkafka options"
},
{
FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_chunk_size),
"Set the chunk size"
},
/* EOF */
{0}
};
Expand Down
3 changes: 3 additions & 0 deletions plugins/in_kafka/in_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#define FLB_IN_KAFKA_DEFAULT_POLL_MS "500"
#define FLB_IN_KAFKA_DEFAULT_FORMAT "none"
#define FLB_IN_KAFKA_UNLIMITED (size_t)-1

enum {
FLB_IN_KAFKA_FORMAT_NONE,
Expand All @@ -44,6 +45,8 @@ struct flb_in_kafka_config {
int format;
char *format_str;
int coll_fd;
size_t buffer_chunk_size; /* Chunk allocation size */
size_t polling_threshold;
};

#endif

0 comments on commit e3add26

Please sign in to comment.