diff --git a/plugins/out_kafka/kafka.c b/plugins/out_kafka/kafka.c index 9379da84724..c860df67ad4 100644 --- a/plugins/out_kafka/kafka.c +++ b/plugins/out_kafka/kafka.c @@ -26,6 +26,7 @@ #include "kafka_config.h" #include "kafka_topic.h" + void cb_kafka_msg(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { @@ -73,6 +74,8 @@ static int cb_kafka_init(struct flb_output_instance *ins, { struct flb_out_kafka *ctx; + flb_plg_info(ins, "Starting kafka output init"); + /* Configuration */ ctx = flb_out_kafka_create(ins, config); if (!ctx) { @@ -85,6 +88,26 @@ static int cb_kafka_init(struct flb_output_instance *ins, return 0; } +int flb_msgpack_get_map_value(struct flb_out_kafka *ctx, msgpack_object *map, const char *key, msgpack_object **val) +{ + if (map->type != MSGPACK_OBJECT_MAP) { + flb_error("[flb_msgpack_get_map_value] Map expected"); + return -1; + } + + size_t i; + for (i = 0; i < map->via.map.size; ++i) { + if (map->via.map.ptr[i].key.type == MSGPACK_OBJECT_STR && + strncmp(map->via.map.ptr[i].key.via.str.ptr, key, map->via.map.ptr[i].key.via.str.size) == 0) { + *val = &map->via.map.ptr[i].val; + flb_debug("key matches a field in the message"); + return 0; + } + } + + return -1; // Key not found +} + int produce_message(struct flb_time *tm, msgpack_object *map, struct flb_out_kafka *ctx, struct flb_config *config) { @@ -106,6 +129,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map, msgpack_object key; msgpack_object val; flb_sds_t s; + rd_kafka_headers_t *kafka_headers = NULL; #ifdef FLB_HAVE_AVRO_ENCODER // used to flag when a buffer needs to be freed for avro @@ -155,6 +179,70 @@ int produce_message(struct flb_time *tm, msgpack_object *map, msgpack_pack_str(&mp_pck, ctx->timestamp_key_len); msgpack_pack_str_body(&mp_pck, ctx->timestamp_key, ctx->timestamp_key_len); + + /* Check if headers are provided in the configuration */ + if (ctx->headers) { + + flb_debug("setting message headers"); + /* Setting headers list size */ + int size_headers = 0; + struct mk_list *tmp; + struct mk_list *head2; + struct flb_config_map_val *mv; + struct flb_slist_entry *hkey = NULL; + struct flb_slist_entry *hval = NULL; + + /* Calculate the number of headers */ + mk_list_foreach_safe(head2, tmp, ctx->headers) { + size_headers++; + } + + /* Create Kafka headers object */ + kafka_headers = rd_kafka_headers_new(size_headers); + + /* Add headers from configuration */ + flb_config_map_foreach(head2, mv, ctx->headers) { + hkey = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); + hval = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head); + + flb_debug("found header %s with value %s", hkey->str, hval->str); + + /* Extract the message field value */ + char *field_name = NULL; + size_t field_len = flb_sds_len(hval->str); + field_name = flb_malloc(field_len); // Allocate memory for field name + if (!field_name) { + flb_errno(); + return -1; + } + + memcpy(field_name, hval->str, field_len); // Copy field name + /* Check if the header value is a message field */ + if (field_name[0] == '<' ) { + flb_debug("header %s is part of the msg, field name : %s", hkey->str, hval->str); + msgpack_object *field_value = NULL; + if (flb_msgpack_get_map_value(ctx, map, field_name + 1, &field_value) == 0 && + field_value->type == MSGPACK_OBJECT_STR) { + rd_kafka_header_add(kafka_headers, hkey->str, flb_sds_len(hkey->str), + field_value->via.str.ptr, field_value->via.str.size); + } + else { + flb_warn("Field '%s' not found or not a string value", field_name); + } + + flb_free(field_name); // Free allocated memory + } + else { + /* Static header value */ + rd_kafka_header_add(kafka_headers, hkey->str, flb_sds_len(hkey->str), + hval->str, flb_sds_len(hval->str)); + } + } + } + else { + flb_debug("no header set"); + } + switch (ctx->timestamp_format) { case FLB_JSON_DATE_DOUBLE: msgpack_pack_double(&mp_pck, flb_time_to_double(tm)); @@ -221,7 +309,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map, if (ctx->dynamic_topic) { /* Only if default topic is set and this topicname is not set for this message */ if (strncmp(topic->name, flb_kafka_topic_default(ctx)->name, val.via.str.size) == 0 && - (strncmp(topic->name, val.via.str.ptr, val.via.str.size) != 0) ) { + (strncmp(topic->name, val.via.str.ptr, val.via.str.size) != 0) ) { if (memchr(val.via.str.ptr, ',', val.via.str.size)) { /* Don't allow commas in kafkatopic name */ flb_warn("',' not allowed in dynamic_kafka topic names"); @@ -392,12 +480,22 @@ int produce_message(struct flb_time *tm, msgpack_object *map, return FLB_RETRY; } - ret = rd_kafka_produce(topic->tp, - RD_KAFKA_PARTITION_UA, - RD_KAFKA_MSG_F_COPY, - out_buf, out_size, - message_key, message_key_len, - ctx); + rd_kafka_resp_err_t err = rd_kafka_producev(ctx->kafka.rk, + RD_KAFKA_V_TOPIC(rd_kafka_topic_name(topic->tp)), + RD_KAFKA_V_HEADERS(kafka_headers), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_VALUE(out_buf, out_size), + RD_KAFKA_V_KEY(message_key, message_key_len), + RD_KAFKA_V_END); + + + + + if (err == RD_KAFKA_RESP_ERR_NO_ERROR) { + flb_plg_info(ctx->ins, "Sending message completed"); + ret = 0; + } + if (ret == -1) { flb_error( @@ -455,7 +553,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map, AVRO_FREE(avro_fast_buffer, out_buf) } #endif - + msgpack_sbuffer_destroy(&mp_sbuf); return FLB_OK; } @@ -622,6 +720,12 @@ static struct flb_config_map config_map[] = { 0, FLB_FALSE, 0, "Set the kafka topics, delimited by commas." }, + { + FLB_CONFIG_MAP_SLIST_1, "header", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_out_kafka, headers), + "Add a kafka message header key/value pair. Multiple headers can be set" + }, + { FLB_CONFIG_MAP_STR, "brokers", (char *)NULL, 0, FLB_FALSE, 0, @@ -647,6 +751,8 @@ static struct flb_config_map config_map[] = { {0} }; + + struct flb_output_plugin out_kafka_plugin = { .name = "kafka", .description = "Kafka", diff --git a/plugins/out_kafka/kafka_config.h b/plugins/out_kafka/kafka_config.h index 42af378e161..7cf1c0522ec 100644 --- a/plugins/out_kafka/kafka_config.h +++ b/plugins/out_kafka/kafka_config.h @@ -86,6 +86,9 @@ struct flb_out_kafka { /* Head of defined topics by configuration */ struct mk_list topics; + /* Headers map defined by configuration*/ + struct mk_list *headers; + /* * Blocked Status: since rdkafka have it own buffering queue, there is a * chance that the queue becomes full, when that happens our default