Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_kafka: Add dynamic/static headers support #8583

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 116 additions & 8 deletions plugins/out_kafka/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "kafka_config.h"
#include "kafka_topic.h"


void cb_kafka_msg(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
void *opaque)
{
Expand Down Expand Up @@ -73,6 +74,8 @@ static int cb_kafka_init(struct flb_output_instance *ins,
{
struct flb_out_kafka *ctx;

flb_plg_info(ins, "Starting kafka output init");

/* Configuration */
ctx = flb_out_kafka_create(ins, config);
if (!ctx) {
Expand All @@ -85,6 +88,26 @@ static int cb_kafka_init(struct flb_output_instance *ins,
return 0;
}

int flb_msgpack_get_map_value(struct flb_out_kafka *ctx, msgpack_object *map, const char *key, msgpack_object **val)
{
if (map->type != MSGPACK_OBJECT_MAP) {
flb_error("[flb_msgpack_get_map_value] Map expected");
return -1;
}

size_t i;
for (i = 0; i < map->via.map.size; ++i) {
if (map->via.map.ptr[i].key.type == MSGPACK_OBJECT_STR &&
strncmp(map->via.map.ptr[i].key.via.str.ptr, key, map->via.map.ptr[i].key.via.str.size) == 0) {
*val = &map->via.map.ptr[i].val;
flb_debug("key matches a field in the message");
return 0;
}
}

return -1; // Key not found
}

int produce_message(struct flb_time *tm, msgpack_object *map,
struct flb_out_kafka *ctx, struct flb_config *config)
{
Expand All @@ -106,6 +129,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
msgpack_object key;
msgpack_object val;
flb_sds_t s;
rd_kafka_headers_t *kafka_headers = NULL;

#ifdef FLB_HAVE_AVRO_ENCODER
// used to flag when a buffer needs to be freed for avro
Expand Down Expand Up @@ -155,6 +179,72 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
msgpack_pack_str(&mp_pck, ctx->timestamp_key_len);
msgpack_pack_str_body(&mp_pck,
ctx->timestamp_key, ctx->timestamp_key_len);

/* Check if headers are provided in the configuration */
if (ctx->headers) {

flb_debug("setting message headers");
/* Setting headers list size */
int size_headers = 0;
struct mk_list *tmp;
struct mk_list *head2;
struct flb_config_map_val *mv;
struct flb_slist_entry *hkey = NULL;
struct flb_slist_entry *hval = NULL;

/* Calculate the number of headers */
mk_list_foreach_safe(head2, tmp, ctx->headers) {
size_headers++;
}

/* Create Kafka headers object */
kafka_headers = rd_kafka_headers_new(size_headers);

/* Add headers from configuration */
flb_config_map_foreach(head2, mv, ctx->headers) {
hkey = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
hval = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);

flb_debug("found header %s with value %s", hkey->str, hval->str);

/* Extract the message field value */
char *field_name = NULL;
size_t field_len = flb_sds_len(hval->str);
field_name = flb_malloc(field_len); // Allocate memory for field name
if (!field_name) {
flb_errno();
return -1;
}

memcpy(field_name, hval->str, field_len); // Copy field name
/* Check if the header value is a message field */
if (field_name[0] == '<' ) {
flb_debug("header %s is part of the msg, field name : %s", hkey->str, hval->str);
msgpack_object *field_value = NULL;
if (flb_msgpack_get_map_value(ctx, map, field_name + 1, &field_value) == 0 &&
field_value->type == MSGPACK_OBJECT_STR) {
rd_kafka_header_add(kafka_headers, hkey->str, flb_sds_len(hkey->str),
field_value->via.str.ptr, field_value->via.str.size);
}
else {
flb_warn("Field '%s' not found or not a string value", field_name);
}


}
else {
/* Static header value */
rd_kafka_header_add(kafka_headers, hkey->str, flb_sds_len(hkey->str),
hval->str, flb_sds_len(hval->str));
}

flb_free(field_name); // Free allocated memory
}
}
else {
flb_debug("no header set");
}

switch (ctx->timestamp_format) {
case FLB_JSON_DATE_DOUBLE:
msgpack_pack_double(&mp_pck, flb_time_to_double(tm));
Expand Down Expand Up @@ -221,7 +311,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
if (ctx->dynamic_topic) {
/* Only if default topic is set and this topicname is not set for this message */
if (strncmp(topic->name, flb_kafka_topic_default(ctx)->name, val.via.str.size) == 0 &&
(strncmp(topic->name, val.via.str.ptr, val.via.str.size) != 0) ) {
(strncmp(topic->name, val.via.str.ptr, val.via.str.size) != 0) ) {
if (memchr(val.via.str.ptr, ',', val.via.str.size)) {
/* Don't allow commas in kafkatopic name */
flb_warn("',' not allowed in dynamic_kafka topic names");
Expand Down Expand Up @@ -392,12 +482,22 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
return FLB_RETRY;
}

ret = rd_kafka_produce(topic->tp,
RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
out_buf, out_size,
message_key, message_key_len,
ctx);
rd_kafka_resp_err_t err = rd_kafka_producev(ctx->kafka.rk,
RD_KAFKA_V_TOPIC(rd_kafka_topic_name(topic->tp)),
RD_KAFKA_V_HEADERS(kafka_headers),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(out_buf, out_size),
RD_KAFKA_V_KEY(message_key, message_key_len),
RD_KAFKA_V_END);




if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
flb_plg_info(ctx->ins, "Sending message completed");
ret = 0;
}


if (ret == -1) {
flb_error(
Expand Down Expand Up @@ -455,7 +555,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
AVRO_FREE(avro_fast_buffer, out_buf)
}
#endif

msgpack_sbuffer_destroy(&mp_sbuf);
return FLB_OK;
}
Expand Down Expand Up @@ -622,6 +722,12 @@ static struct flb_config_map config_map[] = {
0, FLB_FALSE, 0,
"Set the kafka topics, delimited by commas."
},
{
FLB_CONFIG_MAP_SLIST_1, "header", NULL,
FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_out_kafka, headers),
"Add a kafka message header key/value pair. Multiple headers can be set"
},

{
FLB_CONFIG_MAP_STR, "brokers", (char *)NULL,
0, FLB_FALSE, 0,
Expand All @@ -647,6 +753,8 @@ static struct flb_config_map config_map[] = {
{0}
};



struct flb_output_plugin out_kafka_plugin = {
.name = "kafka",
.description = "Kafka",
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_kafka/kafka_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ struct flb_out_kafka {
/* Head of defined topics by configuration */
struct mk_list topics;

/* Headers map defined by configuration*/
struct mk_list *headers;

/*
* Blocked Status: since rdkafka have it own buffering queue, there is a
* chance that the queue becomes full, when that happens our default
Expand Down
Loading