diff --git a/include/fluent-bit/flb_msgpack_append_message.h b/include/fluent-bit/flb_msgpack_append_message.h index 5d821f27457..d95aaec51ab 100644 --- a/include/fluent-bit/flb_msgpack_append_message.h +++ b/include/fluent-bit/flb_msgpack_append_message.h @@ -36,4 +36,12 @@ int flb_msgpack_append_message_to_record(char **result_buffer, char *message_buffer, size_t message_size, int message_type); + +int flb_msgpack_append_map_to_record(char **result_buffer, + size_t *result_size, + flb_sds_t message_key_name, + char *base_object_buffer, + size_t base_object_size, + char *map_data, + size_t map_size); #endif diff --git a/plugins/filter_parser/filter_parser.c b/plugins/filter_parser/filter_parser.c index 3bd5bf97d9c..b0d2c7087cb 100644 --- a/plugins/filter_parser/filter_parser.c +++ b/plugins/filter_parser/filter_parser.c @@ -28,11 +28,11 @@ #include #include #include +#include #include #include #include -#include #include "filter_parser.h" @@ -186,6 +186,8 @@ static int cb_parser_filter(const void *data, size_t bytes, int key_len; const char *val_str; int val_len; + char *parsed_buf; + size_t parsed_size; char *out_buf; size_t out_size; struct flb_time parsed_time; @@ -229,6 +231,7 @@ static int cb_parser_filter(const void *data, size_t bytes, &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { out_buf = NULL; + parsed_buf = NULL; append_arr_i = 0; flb_time_copy(&tm, &log_event.timestamp); @@ -276,7 +279,7 @@ static int cb_parser_filter(const void *data, size_t bytes, flb_time_zero(&parsed_time); parse_ret = flb_parser_do(fp->parser, val_str, val_len, - (void **) &out_buf, &out_size, + (void **) &parsed_buf, &parsed_size, &parsed_time); if (parse_ret >= 0) { /* @@ -320,13 +323,13 @@ static int cb_parser_filter(const void *data, size_t bytes, &log_encoder, log_event.metadata); } - if (out_buf != NULL) { + if (parsed_buf != NULL) { + if (ctx->reserve_data) { char *new_buf = NULL; int new_size; int ret; - - ret = flb_msgpack_expand_map(out_buf, out_size, + ret = flb_msgpack_expand_map(parsed_buf, parsed_size, append_arr, append_arr_len, &new_buf, &new_size); if (ret == -1) { @@ -339,6 +342,30 @@ static int cb_parser_filter(const void *data, size_t bytes, return FLB_FILTER_NOTOUCH; } + out_buf = new_buf; + out_size = new_size; + } + else { + out_buf = strdup(parsed_buf); + out_size = parsed_size; + } + if (ctx->hash_value_field) { + char *new_buf = NULL; + size_t new_size; + int ret; + ret = flb_msgpack_append_map_to_record(&new_buf, &new_size, + flb_sds_create("parsed"), + out_buf, out_size, + parsed_buf,parsed_size); + if ( ret != FLB_MAP_EXPAND_SUCCESS){ + flb_plg_error(ctx->ins, "cannot append parsed entry to record"); + + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + flb_free(append_arr); + + return FLB_FILTER_NOTOUCH; + } flb_free(out_buf); out_buf = new_buf; out_size = new_size; @@ -351,6 +378,7 @@ static int cb_parser_filter(const void *data, size_t bytes, } flb_free(out_buf); + flb_free(parsed_buf); ret = FLB_FILTER_MODIFIED; } else { diff --git a/src/flb_msgpack_append_message.c b/src/flb_msgpack_append_message.c index 75df33b5c7a..b497f99ffde 100644 --- a/src/flb_msgpack_append_message.c +++ b/src/flb_msgpack_append_message.c @@ -80,3 +80,61 @@ int flb_msgpack_append_message_to_record(char **result_buffer, return result; } + +int flb_msgpack_append_map_to_record(char **result_buffer, + size_t *result_size, + flb_sds_t message_key_name, + char *base_object_buffer, + size_t base_object_size, + char *map_data, + size_t map_size) +{ + msgpack_unpacked unpacker; + msgpack_object_kv *new_map_entries[1]; + msgpack_object_kv message_entry; + char *modified_data_buffer; + int modified_data_size; + size_t off = 0; + int i; + int result = FLB_MAP_NOT_MODIFIED; + *result_buffer = NULL; + *result_size = 0; + + if (message_key_name == NULL || map_data == NULL){ + return result; + } + + new_map_entries[0] = &message_entry; + + message_entry.key.type = MSGPACK_OBJECT_STR; + message_entry.key.via.str.size = flb_sds_len(message_key_name); + message_entry.key.via.str.ptr = message_key_name; + + msgpack_unpacked_init(&unpacker); + if ((i=msgpack_unpack_next(&unpacker, map_data, map_size, &off)) != + MSGPACK_UNPACK_SUCCESS ) { + msgpack_unpacked_destroy(&unpacker); + return FLB_MAP_EXPANSION_ERROR; + } + if (unpacker.data.type != MSGPACK_OBJECT_MAP) { + msgpack_unpacked_destroy(&unpacker); + return FLB_MAP_EXPANSION_ERROR; + } + + message_entry.val = unpacker.data; + result = flb_msgpack_expand_map(base_object_buffer, + base_object_size, + new_map_entries, 1, + &modified_data_buffer, + &modified_data_size); + if (result == 0) { + result = FLB_MAP_EXPAND_SUCCESS; + *result_buffer = modified_data_buffer; + *result_size = modified_data_size; + } + else { + result = FLB_MAP_EXPANSION_ERROR; + } + + return result; +} \ No newline at end of file