From c021e35087b53c6632e00fe599b77802019d70b1 Mon Sep 17 00:00:00 2001 From: Athish Pranav D Date: Mon, 12 Aug 2024 22:10:36 +0530 Subject: [PATCH 1/6] Add arg for hash-value-field Signed-off-by: Athish Pranav D --- plugins/filter_parser/filter_parser.c | 6 ++++++ plugins/filter_parser/filter_parser.h | 1 + 2 files changed, 7 insertions(+) diff --git a/plugins/filter_parser/filter_parser.c b/plugins/filter_parser/filter_parser.c index 11bf71f7e53..b393a7e3995 100644 --- a/plugins/filter_parser/filter_parser.c +++ b/plugins/filter_parser/filter_parser.c @@ -32,6 +32,7 @@ #include #include +#include #include "filter_parser.h" @@ -437,6 +438,11 @@ static struct flb_config_map config_map[] = { "Keep all other original fields in the parsed result. " "If false, all other original fields will be removed." }, + { + FLB_CONFIG_MAP_BOOL, "Hash_Value_Field", "false", + 0, FLB_TRUE, offsetof(struct filter_parser_ctx, hash_value_field), + "Stores the parsed values as a hash value in a field with key `parsed`. " + }, { FLB_CONFIG_MAP_DEPRECATED, "Unescape_key", NULL, 0, FLB_FALSE, 0, diff --git a/plugins/filter_parser/filter_parser.h b/plugins/filter_parser/filter_parser.h index 36b26d9ec34..9a0fc9641f9 100644 --- a/plugins/filter_parser/filter_parser.h +++ b/plugins/filter_parser/filter_parser.h @@ -35,6 +35,7 @@ struct filter_parser_ctx { int key_name_len; int reserve_data; int preserve_key; + int hash_value_field; struct mk_list parsers; struct flb_filter_instance *ins; }; From a30a976280256106b275ce9480dd2f30e979209d Mon Sep 17 00:00:00 2001 From: Athish Pranav D Date: Tue, 13 Aug 2024 11:56:49 +0530 Subject: [PATCH 2/6] Code refactoring Signed-off-by: Athish Pranav D --- .../fluent-bit/flb_msgpack_append_message.h | 9 +++ plugins/filter_parser/filter_parser.c | 39 ++++++++++-- src/flb_msgpack_append_message.c | 59 +++++++++++++++++++ 3 files changed, 103 insertions(+), 4 deletions(-) diff --git a/include/fluent-bit/flb_msgpack_append_message.h b/include/fluent-bit/flb_msgpack_append_message.h index 5d821f27457..25990c604f3 100644 --- a/include/fluent-bit/flb_msgpack_append_message.h +++ b/include/fluent-bit/flb_msgpack_append_message.h @@ -27,6 +27,7 @@ #define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3 #include +#include int flb_msgpack_append_message_to_record(char **result_buffer, size_t *result_size, @@ -36,4 +37,12 @@ int flb_msgpack_append_message_to_record(char **result_buffer, char *message_buffer, size_t message_size, int message_type); + +int flb_msgpack_append_map_to_record(char **result_buffer, + size_t *result_size, + flb_sds_t message_key_name, + char *base_object_buffer, + size_t base_object_size, + char *map_data, + size_t map_size); #endif diff --git a/plugins/filter_parser/filter_parser.c b/plugins/filter_parser/filter_parser.c index b393a7e3995..9e1a82a7288 100644 --- a/plugins/filter_parser/filter_parser.c +++ b/plugins/filter_parser/filter_parser.c @@ -28,11 +28,11 @@ #include #include #include +#include #include #include #include -#include #include "filter_parser.h" @@ -189,6 +189,8 @@ static int cb_parser_filter(const void *data, size_t bytes, int key_len; const char *val_str; int val_len; + char *parsed_buf; + size_t parsed_size; char *out_buf; size_t out_size; struct flb_time parsed_time; @@ -232,6 +234,7 @@ static int cb_parser_filter(const void *data, size_t bytes, &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { out_buf = NULL; + parsed_buf = NULL; append_arr_i = 0; flb_time_copy(&tm, &log_event.timestamp); @@ -279,7 +282,7 @@ static int cb_parser_filter(const void *data, size_t bytes, flb_time_zero(&parsed_time); parse_ret = flb_parser_do(fp->parser, val_str, val_len, - (void **) &out_buf, &out_size, + (void **) &parsed_buf, &parsed_size, &parsed_time); if (parse_ret >= 0) { /* @@ -323,12 +326,13 @@ static int cb_parser_filter(const void *data, size_t bytes, &log_encoder, log_event.metadata); } - if (out_buf != NULL) { + if (parsed_buf != NULL) { + if (ctx->reserve_data) { char *new_buf = NULL; int new_size; int ret; - ret = flb_msgpack_expand_map(out_buf, out_size, + ret = flb_msgpack_expand_map(parsed_buf, parsed_size, append_arr, append_arr_len, &new_buf, &new_size); if (ret == -1) { @@ -341,6 +345,32 @@ static int cb_parser_filter(const void *data, size_t bytes, return FLB_FILTER_NOTOUCH; } + out_buf = new_buf; + out_size = new_size; + } + else { + out_buf = strdup(parsed_buf); + out_size = parsed_size; + } + if (ctx->hash_value_field) { + char *new_buf = NULL; + size_t new_size; + int ret; + flb_sds_t hash_key = flb_sds_create("parsed"); + ret = flb_msgpack_append_map_to_record(&new_buf, &new_size, + hash_key, + out_buf, out_size, + parsed_buf,parsed_size); + flb_sds_destroy(hash_key); + if ( ret != FLB_MAP_EXPAND_SUCCESS){ + flb_plg_error(ctx->ins, "cannot append parsed entry to record"); + + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + flb_free(append_arr); + + return FLB_FILTER_NOTOUCH; + } flb_free(out_buf); out_buf = new_buf; out_size = new_size; @@ -353,6 +383,7 @@ static int cb_parser_filter(const void *data, size_t bytes, } flb_free(out_buf); + flb_free(parsed_buf); ret = FLB_FILTER_MODIFIED; } else { diff --git a/src/flb_msgpack_append_message.c b/src/flb_msgpack_append_message.c index 75df33b5c7a..f79ea4f0dff 100644 --- a/src/flb_msgpack_append_message.c +++ b/src/flb_msgpack_append_message.c @@ -80,3 +80,62 @@ int flb_msgpack_append_message_to_record(char **result_buffer, return result; } + +int flb_msgpack_append_map_to_record(char **result_buffer, + size_t *result_size, + flb_sds_t message_key_name, + char *base_object_buffer, + size_t base_object_size, + char *map_data, + size_t map_size) +{ + msgpack_unpacked unpacker; + msgpack_object_kv *new_map_entries[1]; + msgpack_object_kv message_entry; + char *modified_data_buffer; + int modified_data_size; + size_t off = 0; + int i; + int result = FLB_MAP_NOT_MODIFIED; + *result_buffer = NULL; + *result_size = 0; + + if (message_key_name == NULL || map_data == NULL){ + return result; + } + + new_map_entries[0] = &message_entry; + + message_entry.key.type = MSGPACK_OBJECT_STR; + message_entry.key.via.str.size = flb_sds_len(message_key_name); + message_entry.key.via.str.ptr = message_key_name; + + msgpack_unpacked_init(&unpacker); + if ((i=msgpack_unpack_next(&unpacker, map_data, map_size, &off)) != + MSGPACK_UNPACK_SUCCESS ) { + msgpack_unpacked_destroy(&unpacker); + return FLB_MAP_EXPANSION_ERROR; + } + if (unpacker.data.type != MSGPACK_OBJECT_MAP) { + msgpack_unpacked_destroy(&unpacker); + return FLB_MAP_EXPANSION_ERROR; + } + + message_entry.val = unpacker.data; + result = flb_msgpack_expand_map(base_object_buffer, + base_object_size, + new_map_entries, 1, + &modified_data_buffer, + &modified_data_size); + if (result == 0) { + result = FLB_MAP_EXPAND_SUCCESS; + *result_buffer = modified_data_buffer; + *result_size = modified_data_size; + } + else { + result = FLB_MAP_EXPANSION_ERROR; + } + msgpack_unpacked_destroy(&unpacker); + + return result; +} \ No newline at end of file From 6114210892456eff00f34dcef32f752e3f777037 Mon Sep 17 00:00:00 2001 From: Athish Pranav D Date: Sat, 24 Aug 2024 09:46:29 +0530 Subject: [PATCH 3/6] Add UTs Signed-off-by: Athish Pranav D --- tests/runtime/filter_parser.c | 84 +++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/tests/runtime/filter_parser.c b/tests/runtime/filter_parser.c index 8f25fec0e6e..49d6ddd7988 100644 --- a/tests/runtime/filter_parser.c +++ b/tests/runtime/filter_parser.c @@ -812,6 +812,89 @@ void flb_test_filter_parser_preserve_original_field() flb_destroy(ctx); } +void flb_test_filter_parser_hash_value_field() +{ + int ret; + int bytes; + char *p, *output, *expected; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int filter_ffd; + struct flb_parser *parser; + + struct flb_lib_out_cb cb; + cb.cb = callback_test; + cb.data = NULL; + + clear_output(); + + ctx = flb_create(); + + /* Configure service */ + flb_service_set(ctx, "Flush", FLUSH_INTERVAL, "Grace", "1", "Log_Level", "debug", NULL); + + /* Input */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, + "Tag", "test", + NULL); + + /* Parser */ + parser = flb_parser_create("dummy_test", "regex", "^(?[^ ]+) (?[^ ]+) (?[^ ]+) (?.+)$", + FLB_TRUE, + NULL, NULL, NULL, MK_FALSE, MK_TRUE, FLB_FALSE, FLB_FALSE, NULL, 0, + NULL, ctx->config); + TEST_CHECK(parser != NULL); + + /* Filter */ + filter_ffd = flb_filter(ctx, (char *) "parser", NULL); + TEST_CHECK(filter_ffd >= 0); + ret = flb_filter_set(ctx, filter_ffd, + "Match", "test", + "Key_Name", "data", + "Parser", "dummy_test", + "Hash_Value_Field", "On", + NULL); + TEST_CHECK(ret == 0); + + /* Output */ + out_ffd = flb_output(ctx, (char *) "lib", &cb); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "Match", "*", + "format", "json", + NULL); + + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data */ + p = "[1448403340,{\"data\":\"100 0.5 true This is an example\",\"log\":\"An example\"}]"; + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + + wait_with_timeout(2000, &output); /* waiting flush and ensuring data flush */ + TEST_CHECK_(output != NULL, "Expected output to not be NULL"); + if (output != NULL) { + /* check original field is preserved */ + expected = "\"parsed\":{\"INT\":\"100\",\"FLOAT\":\"0.5\",\"BOOL\":\"true\",\"STRING\":\"This is an example\"}"; + TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output); + /* check fields were extracted */ + expected = "\"INT\":\"100\",\"FLOAT\":\"0.5\",\"BOOL\":\"true\",\"STRING\":\"This is an example\""; + TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output); + /* check other fields are preserved */ + // expected = "\"log\":\"An example\""; + // TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output); + free(output); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + // https://github.com/fluent/fluent-bit/issues/2250 void flb_test_filter_parser_first_matched_when_mutilple_parser() { @@ -984,6 +1067,7 @@ TEST_LIST = { {"filter_parser_use_system_timezone", flb_test_filter_parser_use_system_timezone }, {"filter_parser_ignore_malformed_time", flb_test_filter_parser_ignore_malformed_time }, {"filter_parser_preserve_original_field", flb_test_filter_parser_preserve_original_field }, + {"filter_parser_hash_value_field", flb_test_filter_parser_hash_value_field }, {"filter_parser_first_matched_when_multiple_parser", flb_test_filter_parser_first_matched_when_mutilple_parser }, {"filter_parser_skip_empty_values_false", flb_test_filter_parser_skip_empty_values_false}, {NULL, NULL} From 96b4589b33ba1d0f92fd123a15aac20d8e0a92fc Mon Sep 17 00:00:00 2001 From: Athish Pranav D Date: Thu, 29 Aug 2024 18:21:45 +0530 Subject: [PATCH 4/6] Made the param configurable Signed-off-by: Athish Pranav D --- plugins/filter_parser/filter_parser.c | 11 +++++------ plugins/filter_parser/filter_parser.h | 2 +- tests/runtime/filter_parser.c | 2 +- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/plugins/filter_parser/filter_parser.c b/plugins/filter_parser/filter_parser.c index 9e1a82a7288..fc383949cb3 100644 --- a/plugins/filter_parser/filter_parser.c +++ b/plugins/filter_parser/filter_parser.c @@ -101,6 +101,7 @@ static int configure(struct filter_parser_ctx *ctx, int ret; struct mk_list *head; struct flb_kv *kv; + ctx->hash_value_field = NULL; ctx->key_name = NULL; ctx->reserve_data = FLB_FALSE; @@ -352,16 +353,14 @@ static int cb_parser_filter(const void *data, size_t bytes, out_buf = strdup(parsed_buf); out_size = parsed_size; } - if (ctx->hash_value_field) { + if (ctx->hash_value_field!=NULL) { char *new_buf = NULL; size_t new_size; int ret; - flb_sds_t hash_key = flb_sds_create("parsed"); ret = flb_msgpack_append_map_to_record(&new_buf, &new_size, - hash_key, + ctx->hash_value_field, out_buf, out_size, parsed_buf,parsed_size); - flb_sds_destroy(hash_key); if ( ret != FLB_MAP_EXPAND_SUCCESS){ flb_plg_error(ctx->ins, "cannot append parsed entry to record"); @@ -470,9 +469,9 @@ static struct flb_config_map config_map[] = { "If false, all other original fields will be removed." }, { - FLB_CONFIG_MAP_BOOL, "Hash_Value_Field", "false", + FLB_CONFIG_MAP_STR, "Hash_Value_Field", NULL, 0, FLB_TRUE, offsetof(struct filter_parser_ctx, hash_value_field), - "Stores the parsed values as a hash value in a field with key `parsed`. " + "Stores the parsed values as a hash value in a field with key given. " }, { FLB_CONFIG_MAP_DEPRECATED, "Unescape_key", NULL, diff --git a/plugins/filter_parser/filter_parser.h b/plugins/filter_parser/filter_parser.h index 9a0fc9641f9..0b99ee0f565 100644 --- a/plugins/filter_parser/filter_parser.h +++ b/plugins/filter_parser/filter_parser.h @@ -35,7 +35,7 @@ struct filter_parser_ctx { int key_name_len; int reserve_data; int preserve_key; - int hash_value_field; + flb_sds_t hash_value_field; struct mk_list parsers; struct flb_filter_instance *ins; }; diff --git a/tests/runtime/filter_parser.c b/tests/runtime/filter_parser.c index 49d6ddd7988..b1c2faa9aab 100644 --- a/tests/runtime/filter_parser.c +++ b/tests/runtime/filter_parser.c @@ -855,7 +855,7 @@ void flb_test_filter_parser_hash_value_field() "Match", "test", "Key_Name", "data", "Parser", "dummy_test", - "Hash_Value_Field", "On", + "Hash_Value_Field", "parsed", NULL); TEST_CHECK(ret == 0); From 0e983343232b72de0c041dcc6a1c147a1f2f4dc7 Mon Sep 17 00:00:00 2001 From: Athish Pranav D Date: Fri, 30 Aug 2024 20:16:56 +0530 Subject: [PATCH 5/6] Addressed comments Signed-off-by: Athish Pranav D --- src/flb_msgpack_append_message.c | 10 +++++----- tests/runtime/filter_parser.c | 5 +---- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/flb_msgpack_append_message.c b/src/flb_msgpack_append_message.c index f79ea4f0dff..8425d9a2a63 100644 --- a/src/flb_msgpack_append_message.c +++ b/src/flb_msgpack_append_message.c @@ -93,7 +93,7 @@ int flb_msgpack_append_map_to_record(char **result_buffer, msgpack_object_kv *new_map_entries[1]; msgpack_object_kv message_entry; char *modified_data_buffer; - int modified_data_size; + int modified_data_size; size_t off = 0; int i; int result = FLB_MAP_NOT_MODIFIED; @@ -123,10 +123,10 @@ int flb_msgpack_append_map_to_record(char **result_buffer, message_entry.val = unpacker.data; result = flb_msgpack_expand_map(base_object_buffer, - base_object_size, - new_map_entries, 1, - &modified_data_buffer, - &modified_data_size); + base_object_size, + new_map_entries, 1, + &modified_data_buffer, + &modified_data_size); if (result == 0) { result = FLB_MAP_EXPAND_SUCCESS; *result_buffer = modified_data_buffer; diff --git a/tests/runtime/filter_parser.c b/tests/runtime/filter_parser.c index b1c2faa9aab..93ac6203eeb 100644 --- a/tests/runtime/filter_parser.c +++ b/tests/runtime/filter_parser.c @@ -879,15 +879,12 @@ void flb_test_filter_parser_hash_value_field() wait_with_timeout(2000, &output); /* waiting flush and ensuring data flush */ TEST_CHECK_(output != NULL, "Expected output to not be NULL"); if (output != NULL) { - /* check original field is preserved */ + /* check hash value field present */ expected = "\"parsed\":{\"INT\":\"100\",\"FLOAT\":\"0.5\",\"BOOL\":\"true\",\"STRING\":\"This is an example\"}"; TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output); /* check fields were extracted */ expected = "\"INT\":\"100\",\"FLOAT\":\"0.5\",\"BOOL\":\"true\",\"STRING\":\"This is an example\""; TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output); - /* check other fields are preserved */ - // expected = "\"log\":\"An example\""; - // TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output); free(output); } From 6b35eb96fdf44c1b13d478609f7d09421f511361 Mon Sep 17 00:00:00 2001 From: Athish Pranav D Date: Mon, 2 Sep 2024 14:38:23 +0530 Subject: [PATCH 6/6] Add seperate err msg for Unpack issue Signed-off-by: Athish Pranav D --- include/fluent-bit/flb_msgpack_append_message.h | 1 + plugins/filter_parser/filter_parser.c | 4 ++-- src/flb_msgpack_append_message.c | 9 +++++---- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/include/fluent-bit/flb_msgpack_append_message.h b/include/fluent-bit/flb_msgpack_append_message.h index 25990c604f3..b2c5049febf 100644 --- a/include/fluent-bit/flb_msgpack_append_message.h +++ b/include/fluent-bit/flb_msgpack_append_message.h @@ -25,6 +25,7 @@ #define FLB_MAP_NOT_MODIFIED -1 #define FLB_MAP_EXPANSION_ERROR -2 #define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3 +#define FLB_MSGPACK_UNPACK_ERROR -4 #include #include diff --git a/plugins/filter_parser/filter_parser.c b/plugins/filter_parser/filter_parser.c index fc383949cb3..c956700911e 100644 --- a/plugins/filter_parser/filter_parser.c +++ b/plugins/filter_parser/filter_parser.c @@ -101,11 +101,11 @@ static int configure(struct filter_parser_ctx *ctx, int ret; struct mk_list *head; struct flb_kv *kv; - ctx->hash_value_field = NULL; ctx->key_name = NULL; ctx->reserve_data = FLB_FALSE; ctx->preserve_key = FLB_FALSE; + ctx->hash_value_field = NULL; mk_list_init(&ctx->parsers); if (flb_filter_config_map_set(f_ins, ctx) < 0) { @@ -353,7 +353,7 @@ static int cb_parser_filter(const void *data, size_t bytes, out_buf = strdup(parsed_buf); out_size = parsed_size; } - if (ctx->hash_value_field!=NULL) { + if (ctx->hash_value_field != NULL) { char *new_buf = NULL; size_t new_size; int ret; diff --git a/src/flb_msgpack_append_message.c b/src/flb_msgpack_append_message.c index 8425d9a2a63..a2217617a47 100644 --- a/src/flb_msgpack_append_message.c +++ b/src/flb_msgpack_append_message.c @@ -111,14 +111,15 @@ int flb_msgpack_append_map_to_record(char **result_buffer, message_entry.key.via.str.ptr = message_key_name; msgpack_unpacked_init(&unpacker); - if ((i=msgpack_unpack_next(&unpacker, map_data, map_size, &off)) != - MSGPACK_UNPACK_SUCCESS ) { + if ((i = msgpack_unpack_next(&unpacker, + map_data, + map_size, &off)) != MSGPACK_UNPACK_SUCCESS) { msgpack_unpacked_destroy(&unpacker); - return FLB_MAP_EXPANSION_ERROR; + return FLB_MSGPACK_UNPACK_ERROR; } if (unpacker.data.type != MSGPACK_OBJECT_MAP) { msgpack_unpacked_destroy(&unpacker); - return FLB_MAP_EXPANSION_ERROR; + return FLB_MSGPACK_UNPACK_ERROR; } message_entry.val = unpacker.data;