From cd8792d1d5dbee03f38c855d533d851dd9344884 Mon Sep 17 00:00:00 2001 From: Sindhu Karri Date: Thu, 16 May 2024 12:58:44 +0000 Subject: [PATCH 1/2] Fix Fluent-bit issue Potential log loss during high load at Multiline and Rewrite Tag Filter --- SPECS/fluent-bit/fluent-bit.spec | 6 +- .../in_emitter_fix_issue_8198.patch | 661 ++++++++++++++++++ 2 files changed, 666 insertions(+), 1 deletion(-) create mode 100644 SPECS/fluent-bit/in_emitter_fix_issue_8198.patch diff --git a/SPECS/fluent-bit/fluent-bit.spec b/SPECS/fluent-bit/fluent-bit.spec index a475a47f4de..51d5cf5e4d3 100644 --- a/SPECS/fluent-bit/fluent-bit.spec +++ b/SPECS/fluent-bit/fluent-bit.spec @@ -1,12 +1,13 @@ Summary: Fast and Lightweight Log processor and forwarder for Linux, BSD and OSX Name: fluent-bit Version: 2.2.2 -Release: 1%{?dist} +Release: 2%{?dist} License: Apache-2.0 Vendor: Microsoft Corporation Distribution: Mariner URL: https://fluentbit.io Source0: https://github.com/fluent/%{name}/archive/refs/tags/v%{version}.tar.gz#/%{name}-%{version}.tar.gz +Patch0: in_emitter_fix_issue_8198.patch BuildRequires: bison BuildRequires: cmake BuildRequires: cyrus-sasl-devel @@ -80,6 +81,9 @@ Development files for %{name} %{_libdir}/fluent-bit/*.so %changelog +* Thu Apr 16 2024 Sindhu Karri - 2.2.2-2 +- Apply patch in_emitter_fix_issue_8198.patch to fix #8198 ( Potential log loss during high load at Multiline & Rewrite Tag Filter (in_emitter) ) + * Wed Apr 03 2024 CBL-Mariner Servicing Account - 2.2.2-1 - Auto-upgrade to 2.2.2 - CVE-2024-23722 diff --git a/SPECS/fluent-bit/in_emitter_fix_issue_8198.patch b/SPECS/fluent-bit/in_emitter_fix_issue_8198.patch new file mode 100644 index 00000000000..d9861ab126d --- /dev/null +++ b/SPECS/fluent-bit/in_emitter_fix_issue_8198.patch @@ -0,0 +1,661 @@ +From feb424367d08666dd9fb0a6405f05c19b6678873 Mon Sep 17 00:00:00 2001 +From: Richard Treu +Date: Fri, 9 Feb 2024 23:46:32 +0100 +Subject: [PATCH 1/6] in_emitter: Fix to prevent single record chunks and do + pause on mem_buf_limit + +The current code creates a situation, where only one record per chunk + is created. In case of a non-existing ring-buffer, the old mechanism is used. + +Also the in_emitter plugin continued to accept records even after the +set emitter_mem_buf_limit was reached. This commit implements a +check if the plugin was paused and returns accordingly. + +Signed-off-by: Richard Treu +--- + plugins/in_emitter/emitter.c | 67 +++++++++++++++++++++++++++++++++--- + 1 file changed, 62 insertions(+), 5 deletions(-) + +diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c +index 62886d1346c..532a629b924 100644 +--- a/plugins/in_emitter/emitter.c ++++ b/plugins/in_emitter/emitter.c +@@ -31,6 +31,9 @@ + + #define DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY 2000 + ++/* return values */ ++#define FLB_EMITTER_BUSY 3 ++ + struct em_chunk { + flb_sds_t tag; + struct msgpack_sbuffer mp_sbuf; /* msgpack sbuffer */ +@@ -39,6 +42,7 @@ struct em_chunk { + }; + + struct flb_emitter { ++ int coll_fd; /* collector id */ + struct mk_list chunks; /* list of all pending chunks */ + struct flb_input_instance *ins; /* input instance */ + struct flb_ring_buffer *msgs; /* ring buffer for cross-thread messages */ +@@ -97,7 +101,6 @@ int static do_in_emitter_add_record(struct em_chunk *ec, + em_chunk_destroy(ec); + return -1; + } +- /* Release the echunk */ + em_chunk_destroy(ec); + return 0; + } +@@ -118,6 +121,12 @@ int in_emitter_add_record(const char *tag, int tag_len, + ctx = (struct flb_emitter *) in->context; + ec = NULL; + ++ /* Restricted by mem_buf_limit */ ++ if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { ++ flb_plg_debug(ctx->ins, "emitter memory buffer limit reached. Not accepting record."); ++ return FLB_EMITTER_BUSY; ++ } ++ + /* Use the ring buffer first if it exists */ + if (ctx->msgs) { + memset(&temporary_chunk, 0, sizeof(struct em_chunk)); +@@ -161,8 +170,7 @@ int in_emitter_add_record(const char *tag, int tag_len, + + /* Append raw msgpack data */ + msgpack_sbuffer_write(&ec->mp_sbuf, buf_data, buf_size); +- +- return do_in_emitter_add_record(ec, in); ++ return 0; + } + + /* +@@ -191,6 +199,34 @@ static int in_emitter_ingest_ring_buffer(struct flb_input_instance *in, + return ret; + } + ++static int cb_queue_chunks(struct flb_input_instance *in, ++ struct flb_config *config, void *data) ++{ ++ int ret; ++ struct mk_list *tmp; ++ struct mk_list *head; ++ struct em_chunk *echunk; ++ struct flb_emitter *ctx; ++ ++ /* Get context */ ++ ctx = (struct flb_emitter *) data; ++ ++ /* Try to enqueue chunks under our limits */ ++ mk_list_foreach_safe(head, tmp, &ctx->chunks) { ++ echunk = mk_list_entry(head, struct em_chunk, _head); ++ ++ /* Associate this backlog chunk to this instance into the engine */ ++ ret = do_in_emitter_add_record(echunk, in); ++ if (ret == -1) { ++ flb_error("[in_emitter] error registering chunk with tag: %s", ++ echunk->tag); ++ continue; ++ } ++ } ++ ++ return 0; ++} ++ + static int in_emitter_start_ring_buffer(struct flb_input_instance *in, struct flb_emitter *ctx) + { + if (ctx->ring_buffer_size <= 0) { +@@ -257,6 +293,15 @@ static int cb_emitter_init(struct flb_input_instance *in, + return -1; + } + } ++ else{ ++ ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 50000000, config); ++ if (ret < 0) { ++ flb_error("[in_emitter] could not create collector"); ++ flb_free(ctx); ++ return -1; ++ } ++ ctx->coll_fd = ret; ++ } + + /* export plugin context */ + flb_input_set_context(in, ctx); +@@ -264,6 +309,18 @@ static int cb_emitter_init(struct flb_input_instance *in, + return 0; + } + ++static void cb_emitter_pause(void *data, struct flb_config *config) ++{ ++ struct flb_emitter *ctx = data; ++ flb_input_collector_pause(ctx->coll_fd, ctx->ins); ++} ++ ++static void cb_emitter_resume(void *data, struct flb_config *config) ++{ ++ struct flb_emitter *ctx = data; ++ flb_input_collector_resume(ctx->coll_fd, ctx->ins); ++} ++ + static int cb_emitter_exit(void *data, struct flb_config *config) + { + struct mk_list *tmp; +@@ -312,8 +369,8 @@ struct flb_input_plugin in_emitter_plugin = { + .cb_ingest = NULL, + .cb_flush_buf = NULL, + .config_map = config_map, +- .cb_pause = NULL, +- .cb_resume = NULL, ++ .cb_pause = cb_emitter_pause, ++ .cb_resume = cb_emitter_resume, + .cb_exit = cb_emitter_exit, + + /* This plugin can only be configured and invoked by the Engine only */ + +From 37826b66b29d1ad867d220313178c3feac9b792a Mon Sep 17 00:00:00 2001 +From: Richard Treu +Date: Thu, 11 Apr 2024 23:53:10 +0200 +Subject: [PATCH 2/6] filter_multiline: Pause source input plugins on filter + pause This commit will pause the inputs (sending to multiline) to not loose + any in-flight records. + +Signed-off-by: Richard Treu +--- + plugins/filter_multiline/ml.c | 14 ++++++++++++-- + plugins/filter_multiline/ml.h | 4 +++- + 2 files changed, 15 insertions(+), 3 deletions(-) + +diff --git a/plugins/filter_multiline/ml.c b/plugins/filter_multiline/ml.c +index 41b1b8a4d64..ced8ec83739 100644 +--- a/plugins/filter_multiline/ml.c ++++ b/plugins/filter_multiline/ml.c +@@ -176,7 +176,7 @@ static int flush_callback(struct flb_ml_parser *parser, + /* Emit record with original tag */ + flb_plg_trace(ctx->ins, "emitting from %s to %s", stream->input_name, stream->tag); + ret = in_emitter_add_record(stream->tag, flb_sds_len(stream->tag), buf_data, buf_size, +- ctx->ins_emitter); ++ ctx->ins_emitter, ctx->i_ins); + + return ret; + } +@@ -526,7 +526,8 @@ static void partial_timer_cb(struct flb_config *config, void *data) + ret = in_emitter_add_record(packer->tag, flb_sds_len(packer->tag), + packer->log_encoder.output_buffer, + packer->log_encoder.output_length, +- ctx->ins_emitter); ++ ctx->ins_emitter, ++ ctx->i_ins); + if (ret < 0) { + /* this shouldn't happen in normal execution */ + flb_plg_warn(ctx->ins, +@@ -741,6 +742,15 @@ static int cb_ml_filter(const void *data, size_t bytes, + return FLB_FILTER_NOTOUCH; + } + ++ if (ctx->i_ins == NULL){ ++ ctx->i_ins = i_ins; ++ } ++ if (ctx->i_ins != i_ins) { ++ flb_plg_trace(ctx->ins, "input instance changed from %s to %s", ++ ctx->i_ins->name, i_ins->name); ++ ctx->i_ins = i_ins; ++ } ++ + /* 'partial_message' mode */ + if (ctx->partial_mode == FLB_TRUE) { + return ml_filter_partial(data, bytes, tag, tag_len, +diff --git a/plugins/filter_multiline/ml.h b/plugins/filter_multiline/ml.h +index 59bf6c7e826..cae8fb64166 100644 +--- a/plugins/filter_multiline/ml.h ++++ b/plugins/filter_multiline/ml.h +@@ -73,6 +73,7 @@ struct ml_ctx { + size_t emitter_mem_buf_limit; /* Emitter buffer limit */ + struct flb_input_instance *ins_emitter; /* emitter input plugin instance */ + struct flb_config *config; /* Fluent Bit context */ ++ struct flb_input_instance *i_ins; /* Fluent Bit input instance (last used)*/ + + #ifdef FLB_HAVE_METRICS + struct cmt_counter *cmt_emitted; +@@ -82,6 +83,7 @@ struct ml_ctx { + /* Register external function to emit records, check 'plugins/in_emitter' */ + int in_emitter_add_record(const char *tag, int tag_len, + const char *buf_data, size_t buf_size, +- struct flb_input_instance *in); ++ struct flb_input_instance *in, ++ struct flb_input_instance *i_ins); + + #endif + +From 2087601806b39719ac64c2862f81e7c5222efd3a Mon Sep 17 00:00:00 2001 +From: Richard Treu +Date: Thu, 11 Apr 2024 23:55:40 +0200 +Subject: [PATCH 3/6] filter_rewrite_tag: Pause source input plugins on filter + pause This commit will pause the inputs (sending to rewrite_tag) to not loose + any in-flight records. + +Signed-off-by: Richard Treu +--- + plugins/filter_rewrite_tag/rewrite_tag.c | 7 ++++--- + plugins/filter_rewrite_tag/rewrite_tag.h | 3 ++- + 2 files changed, 6 insertions(+), 4 deletions(-) + +diff --git a/plugins/filter_rewrite_tag/rewrite_tag.c b/plugins/filter_rewrite_tag/rewrite_tag.c +index 01b0f168fe2..c8bfe029350 100644 +--- a/plugins/filter_rewrite_tag/rewrite_tag.c ++++ b/plugins/filter_rewrite_tag/rewrite_tag.c +@@ -355,7 +355,8 @@ static int ingest_inline(struct flb_rewrite_tag *ctx, + */ + static int process_record(const char *tag, int tag_len, msgpack_object map, + const void *buf, size_t buf_size, int *keep, +- struct flb_rewrite_tag *ctx, int *matched) ++ struct flb_rewrite_tag *ctx, int *matched, ++ struct flb_input_instance *i_ins) + { + int ret; + flb_sds_t out_tag; +@@ -404,7 +405,7 @@ static int process_record(const char *tag, int tag_len, msgpack_object map, + if (!ret) { + /* Emit record with new tag */ + ret = in_emitter_add_record(out_tag, flb_sds_len(out_tag), buf, buf_size, +- ctx->ins_emitter); ++ ctx->ins_emitter, i_ins); + } + else { + ret = 0; +@@ -489,7 +490,7 @@ static int cb_rewrite_tag_filter(const void *data, size_t bytes, + * If a record was emitted, the variable 'keep' will define if the record must + * be preserved or not. + */ +- is_emitted = process_record(tag, tag_len, map, (char *) data + pre, off - pre, &keep, ctx, &is_matched); ++ is_emitted = process_record(tag, tag_len, map, (char *) data + pre, off - pre, &keep, ctx, &is_matched, i_ins); + if (is_emitted == FLB_TRUE) { + /* A record with the new tag was emitted */ + emitted_num++; +diff --git a/plugins/filter_rewrite_tag/rewrite_tag.h b/plugins/filter_rewrite_tag/rewrite_tag.h +index 11c0535fde1..d73b49f12eb 100644 +--- a/plugins/filter_rewrite_tag/rewrite_tag.h ++++ b/plugins/filter_rewrite_tag/rewrite_tag.h +@@ -57,7 +57,8 @@ struct flb_rewrite_tag { + /* Register external function to emit records, check 'plugins/in_emitter' */ + int in_emitter_add_record(const char *tag, int tag_len, + const char *buf_data, size_t buf_size, +- struct flb_input_instance *in); ++ struct flb_input_instance *in, ++ struct flb_input_instance *i_ins); + int in_emitter_get_collector_id(struct flb_input_instance *in); + + + +From 64214ada1ded5afc1dae042473b50fa1f8dc9467 Mon Sep 17 00:00:00 2001 +From: Richard Treu +Date: Thu, 11 Apr 2024 23:57:15 +0200 +Subject: [PATCH 4/6] in_emitter: Pause source input plugins on in_emitter + pause This commit will pause all known inputs (sending to multiline) to not + loose any in-flight records. in_emitter will keep track of all sending input + plugins and actively pause/resume them in case in_emitter is paused/resumed. + +Signed-off-by: Richard Treu +--- + plugins/in_emitter/emitter.c | 77 ++++++++++++++++++++++++++++++++++-- + 1 file changed, 73 insertions(+), 4 deletions(-) + +diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c +index 532a629b924..8092a7954ee 100644 +--- a/plugins/in_emitter/emitter.c ++++ b/plugins/in_emitter/emitter.c +@@ -32,7 +32,7 @@ + #define DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY 2000 + + /* return values */ +-#define FLB_EMITTER_BUSY 3 ++#define FLB_EMITTER_BUSY -2 + + struct em_chunk { + flb_sds_t tag; +@@ -41,12 +41,18 @@ struct em_chunk { + struct mk_list _head; + }; + ++struct input_ref { ++ struct flb_input_instance *i_ins; ++ struct mk_list _head; ++}; ++ + struct flb_emitter { + int coll_fd; /* collector id */ + struct mk_list chunks; /* list of all pending chunks */ + struct flb_input_instance *ins; /* input instance */ + struct flb_ring_buffer *msgs; /* ring buffer for cross-thread messages */ + int ring_buffer_size; /* size of the ring buffer */ ++ struct mk_list i_ins_list; /* instance list of linked/sending inputs */ + }; + + struct em_chunk *em_chunk_create(const char *tag, int tag_len, +@@ -89,6 +95,12 @@ int static do_in_emitter_add_record(struct em_chunk *ec, + struct flb_emitter *ctx = (struct flb_emitter *) in->context; + int ret; + ++ if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { ++ flb_plg_debug(ctx->ins, "_emitter %s paused. Not processing records.", ++ ctx->ins->name); ++ return FLB_EMITTER_BUSY; ++ } ++ + /* Associate this backlog chunk to this instance into the engine */ + ret = flb_input_log_append(in, + ec->tag, flb_sds_len(ec->tag), +@@ -111,15 +123,45 @@ int static do_in_emitter_add_record(struct em_chunk *ec, + */ + int in_emitter_add_record(const char *tag, int tag_len, + const char *buf_data, size_t buf_size, +- struct flb_input_instance *in) ++ struct flb_input_instance *in, ++ struct flb_input_instance *i_ins) + { + struct em_chunk temporary_chunk; + struct mk_list *head; ++ struct input_ref *i_ref; ++ bool ref_found; ++ struct mk_list *tmp; ++ + struct em_chunk *ec; + struct flb_emitter *ctx; + + ctx = (struct flb_emitter *) in->context; + ec = NULL; ++ /* Iterate over list of already known (source) inputs */ ++ /* If new, add it to the list to be able to pause it later on */ ++ ref_found = false; ++ mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { ++ i_ref = mk_list_entry(head, struct input_ref, _head); ++ if(i_ref->i_ins == i_ins){ ++ ref_found = true; ++ break; ++ } ++ } ++ if (!ref_found) { ++ i_ref = flb_malloc(sizeof(struct input_ref)); ++ if (!i_ref) { ++ flb_errno(); ++ return FLB_FILTER_NOTOUCH; ++ } ++ i_ref->i_ins = i_ins; ++ mk_list_add(&i_ref->_head, &ctx->i_ins_list); ++ /* If in_emitter is paused, but new input plugin is not paused, pause it */ ++ if (flb_input_buf_paused(ctx->ins) == FLB_TRUE && ++ flb_input_buf_paused(i_ins) == FLB_FALSE) { ++ flb_input_pause(i_ins); ++ } ++ } ++ + + /* Restricted by mem_buf_limit */ + if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { +@@ -268,6 +310,8 @@ static int cb_emitter_init(struct flb_input_instance *in, + ctx->ins = in; + mk_list_init(&ctx->chunks); + ++ mk_list_init(&ctx->i_ins_list); ++ + + ret = flb_input_config_map_set(in, (void *) ctx); + if (ret == -1) { +@@ -294,7 +338,7 @@ static int cb_emitter_init(struct flb_input_instance *in, + } + } + else{ +- ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 50000000, config); ++ ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 25000000, config); + if (ret < 0) { + flb_error("[in_emitter] could not create collector"); + flb_free(ctx); +@@ -312,13 +356,31 @@ static int cb_emitter_init(struct flb_input_instance *in, + static void cb_emitter_pause(void *data, struct flb_config *config) + { + struct flb_emitter *ctx = data; ++ struct mk_list *tmp; ++ struct mk_list *head; ++ struct input_ref *i_ref; ++ ++ /* Pause all known senders */ + flb_input_collector_pause(ctx->coll_fd, ctx->ins); ++ mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { ++ i_ref = mk_list_entry(head, struct input_ref, _head); ++ flb_input_pause(i_ref->i_ins); ++ } + } + + static void cb_emitter_resume(void *data, struct flb_config *config) + { + struct flb_emitter *ctx = data; ++ struct mk_list *tmp; ++ struct mk_list *head; ++ struct input_ref *i_ref; ++ ++ /* Resume all known senders */ + flb_input_collector_resume(ctx->coll_fd, ctx->ins); ++ mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { ++ i_ref = mk_list_entry(head, struct input_ref, _head); ++ flb_input_resume(i_ref->i_ins); ++ } + } + + static int cb_emitter_exit(void *data, struct flb_config *config) +@@ -328,9 +390,9 @@ static int cb_emitter_exit(void *data, struct flb_config *config) + struct flb_emitter *ctx = data; + struct em_chunk *echunk; + struct em_chunk ec; ++ struct input_ref *i_ref; + int ret; + +- + mk_list_foreach_safe(head, tmp, &ctx->chunks) { + echunk = mk_list_entry(head, struct em_chunk, _head); + mk_list_del(&echunk->_head); +@@ -346,6 +408,13 @@ static int cb_emitter_exit(void *data, struct flb_config *config) + flb_ring_buffer_destroy(ctx->msgs); + } + ++ mk_list_foreach_safe(head,tmp, &ctx->i_ins_list) { ++ i_ref = mk_list_entry(head, struct input_ref, _head); ++ mk_list_del(&i_ref->_head); ++ flb_free(i_ref); ++ } ++ ++ + flb_free(ctx); + return 0; + } + +From f6137ec60bdffc6f5c80e491b463541702438772 Mon Sep 17 00:00:00 2001 +From: Richard Treu +Date: Fri, 12 Apr 2024 00:00:39 +0200 +Subject: [PATCH 5/6] flb_input: Add missing input resume message This commit + will add a resume message, when a paused input plugin is resumed. + +Signed-off-by: Richard Treu +--- + src/flb_input.c | 1 + + 1 file changed, 1 insertion(+) + +diff --git a/src/flb_input.c b/src/flb_input.c +index a990a9d2805..7b614ccdb44 100644 +--- a/src/flb_input.c ++++ b/src/flb_input.c +@@ -1729,6 +1729,7 @@ int flb_input_resume(struct flb_input_instance *ins) + flb_input_thread_instance_resume(ins); + } + else { ++ flb_info("[input] resume %s", flb_input_name(ins)); + ins->p->cb_resume(ins->context, ins->config); + } + } + +From 3162d0c3db2f7df9392c6d880280b923002066b1 Mon Sep 17 00:00:00 2001 +From: Richard Treu +Date: Fri, 12 Apr 2024 00:02:03 +0200 +Subject: [PATCH 6/6] tests: filter_multiline: Add test for in_emitter pause by + using multiline This commit will add a test for pause functionality of + in_emitter. The test uses a small emitter buffer size, so the in_emitter will + definitely be paused. + +Signed-off-by: Richard Treu +--- + tests/runtime/filter_multiline.c | 124 +++++++++++++++++++++++++++++++ + 1 file changed, 124 insertions(+) + +diff --git a/tests/runtime/filter_multiline.c b/tests/runtime/filter_multiline.c +index 18253a5b2c7..ed6ffb6b7cb 100644 +--- a/tests/runtime/filter_multiline.c ++++ b/tests/runtime/filter_multiline.c +@@ -2,6 +2,7 @@ + + #include + #include ++#include + #include "flb_tests_runtime.h" + + struct filter_test { +@@ -120,7 +121,34 @@ static int cb_check_str_list(void *record, size_t size, void *data) + return 0; + } + ++void wait_with_timeout(uint32_t timeout_ms, int *output_num, int expected) ++{ ++ struct flb_time start_time; ++ struct flb_time end_time; ++ struct flb_time diff_time; ++ uint64_t elapsed_time_flb = 0; ++ ++ flb_time_get(&start_time); ++ ++ while (true) { ++ *output_num = get_output_num(); ++ ++ if (*output_num == expected) { ++ break; ++ } ++ ++ flb_time_msleep(100); ++ flb_time_get(&end_time); ++ flb_time_diff(&end_time, &start_time, &diff_time); ++ elapsed_time_flb = flb_time_to_nanosec(&diff_time) / 1000000; + ++ if (elapsed_time_flb > timeout_ms) { ++ flb_warn("[timeout] elapsed_time: %ld", elapsed_time_flb); ++ // Reached timeout. ++ break; ++ } ++ } ++} + + static struct filter_test *filter_test_create(struct flb_lib_out_cb *data) + { +@@ -682,6 +710,100 @@ static void flb_test_ml_buffered_16_streams() + filter_test_destroy(ctx); + } + ++/* This test will test the pausing of in_emitter */ ++static void flb_test_ml_buffered_16_streams_pausing() ++{ ++ struct flb_lib_out_cb cb_data; ++ struct filter_test *ctx; ++ int i_ffds[16] = {0}; ++ int ffd_num = sizeof(i_ffds)/sizeof(int); ++ int ret; ++ int i; ++ int j; ++ int bytes; ++ int len; ++ char line_buf[2048] = {0}; ++ char tag_buf[32] = {0}; ++ int line_num; ++ int num; ++ ++ char *expected_strs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property\\n at com.example.myproject.Author.getBookIds(xx.java:38)\\n at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\\nCaused by: java.lang.NullPointerException\\n at com.example.myproject.Book.getId(Book.java:22)\\n at com.example.myproject.Author.getBookIds(Author.java:35)\\n ... 1 more"}; ++ ++ struct str_list expected = { ++ .size = sizeof(expected_strs)/sizeof(char*), ++ .lists = &expected_strs[0], ++ .ignore_min_line_num = 64, ++ }; ++ ++ char *ml_logs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property", ++ " at com.example.myproject.Author.getBookIds(xx.java:38)", ++ " at com.example.myproject.Bootstrap.main(Bootstrap.java:14)", ++ "Caused by: java.lang.NullPointerException", ++ " at com.example.myproject.Book.getId(Book.java:22)", ++ " at com.example.myproject.Author.getBookIds(Author.java:35)", ++ " ... 1 more", ++ "single line"}; ++ ++ cb_data.cb = cb_check_str_list; ++ cb_data.data = (void *)&expected; ++ ++ clear_output_num(); ++ ++ line_num = sizeof(ml_logs)/sizeof(char*); ++ ++ /* Create test context */ ++ ctx = filter_test_create((void *) &cb_data); ++ if (!ctx) { ++ exit(EXIT_FAILURE); ++ } ++ flb_service_set(ctx->flb, ++ "Flush", "0.100000000", ++ "Grace", "2", ++ NULL); ++ ++ i_ffds[0] = ctx->i_ffd; ++ for (i=1; iflb, (char *) "lib", NULL); ++ TEST_CHECK(i_ffds[i] >= 0); ++ sprintf(&tag_buf[0], "test%d", i); ++ flb_input_set(ctx->flb, i_ffds[i], "tag", tag_buf, NULL); ++ } ++ ++ /* Configure filter */ ++ /* Set mem_buf_limit small, so in_emitter will be paused */ ++ ret = flb_filter_set(ctx->flb, ctx->f_ffd, ++ "multiline.key_content", "log", ++ "multiline.parser", "java", ++ "buffer", "on", ++ "debug_flush", "on", ++ "emitter_mem_buf_limit", "1k", ++ NULL); ++ TEST_CHECK(ret == 0); ++ ++ ++ /* Start the engine */ ++ ret = flb_start(ctx->flb); ++ TEST_CHECK(ret == 0); ++ ++ for (i=0; iflb, i_ffds[j], &line_buf[0], len); ++ TEST_CHECK(bytes == len); ++ } ++ } ++ wait_with_timeout(20000, &num, ffd_num); ++ ++ if (!TEST_CHECK(num > 0)) { ++ TEST_MSG("output error. got %d expect more than 0 records.", num); ++ /* The internal flb_lib_push cannot be paused, so records may be lost */ ++ /* However, there should be at least some records */ ++ } ++ ++ filter_test_destroy(ctx); ++} ++ + + + +@@ -695,5 +817,7 @@ TEST_LIST = { + + {"flb_test_multiline_partial_message_concat" , flb_test_multiline_partial_message_concat }, + {"flb_test_multiline_partial_message_concat_two_ids" , flb_test_multiline_partial_message_concat_two_ids }, ++ ++ {"ml_buffered_16_streams_pausing" , flb_test_ml_buffered_16_streams_pausing }, + {NULL, NULL} + }; From 418686d1cde2f9f74eb5d777bb0c117f1aac373e Mon Sep 17 00:00:00 2001 From: Sindhu Karri Date: Mon, 3 Jun 2024 04:29:08 +0000 Subject: [PATCH 2/2] Fix fluent-bit issue 8025 in_tail: missing log for offset processing due to non-existent old inodes in sqlite --- SPECS/fluent-bit/fix_issue_8025.patch | 779 ++++++++++++++++++++++++++ SPECS/fluent-bit/fluent-bit.spec | 2 + 2 files changed, 781 insertions(+) create mode 100644 SPECS/fluent-bit/fix_issue_8025.patch diff --git a/SPECS/fluent-bit/fix_issue_8025.patch b/SPECS/fluent-bit/fix_issue_8025.patch new file mode 100644 index 00000000000..d5d97590822 --- /dev/null +++ b/SPECS/fluent-bit/fix_issue_8025.patch @@ -0,0 +1,779 @@ +From c60999c186c23cff79dad4dd31c838404ace228e Mon Sep 17 00:00:00 2001 +From: "jinyong.choi" +Date: Wed, 18 Oct 2023 23:58:38 +0900 +Subject: [PATCH 1/2] in_tail: Delete unmanaged inodes from db during startup + (#8025) (1/2) + +To prevent incorrect inode references, +FluentBit automatically removes unmanaged inodes during startup. + +Signed-off-by: jinyong.choi +--- + plugins/in_tail/tail.c | 9 ++ + plugins/in_tail/tail_db.c | 161 +++++++++++++++++++++++++++++++ + plugins/in_tail/tail_db.h | 3 + + plugins/in_tail/tail_sql.h | 22 +++++ + tests/runtime/in_tail.c | 189 +++++++++++++++++++++++++++++++++++++ + 5 files changed, 384 insertions(+) + +diff --git a/plugins/in_tail/tail.c b/plugins/in_tail/tail.c +index 34a0fec3dbd..37b1f4f6c68 100644 +--- a/plugins/in_tail/tail.c ++++ b/plugins/in_tail/tail.c +@@ -372,6 +372,15 @@ static int in_tail_init(struct flb_input_instance *in, + /* Scan path */ + flb_tail_scan(ctx->path_list, ctx); + ++#ifdef FLB_HAVE_SQLDB ++ /* Delete stale files that are not monitored from the database */ ++ ret = flb_tail_db_stale_file_delete(in, config, ctx); ++ if (ret == -1) { ++ flb_tail_config_destroy(ctx); ++ return -1; ++ } ++#endif ++ + /* + * After the first scan (on start time), all new files discovered needs to be + * read from head, so we switch the 'read_from_head' flag to true so any +diff --git a/plugins/in_tail/tail_db.c b/plugins/in_tail/tail_db.c +index 664963b6dba..99242f8a15b 100644 +--- a/plugins/in_tail/tail_db.c ++++ b/plugins/in_tail/tail_db.c +@@ -168,6 +168,42 @@ static int db_file_insert(struct flb_tail_file *file, struct flb_tail_config *ct + return flb_sqldb_last_id(ctx->db); + } + ++static int stmt_add_param_concat(struct flb_tail_config *ctx, ++ flb_sds_t *stmt_sql, uint64_t count) ++{ ++ uint64_t idx; ++ flb_sds_t sds_tmp; ++ ++ sds_tmp = flb_sds_cat(*stmt_sql, SQL_STMT_START_PARAM, ++ SQL_STMT_START_PARAM_LEN); ++ if (sds_tmp == NULL) { ++ flb_plg_debug(ctx->ins, "error concatenating stmt_sql: param start"); ++ return -1; ++ } ++ *stmt_sql = sds_tmp; ++ ++ for (idx = 1; idx < count; idx++) { ++ sds_tmp = flb_sds_cat(*stmt_sql, SQL_STMT_ADD_PARAM, ++ SQL_STMT_ADD_PARAM_LEN); ++ if (sds_tmp == NULL) { ++ flb_plg_debug(ctx->ins, "error concatenating stmt_sql: add param"); ++ return -1; ++ } ++ ++ *stmt_sql = sds_tmp; ++ } ++ ++ sds_tmp = flb_sds_cat(*stmt_sql, SQL_STMT_PARAM_END, ++ SQL_STMT_PARAM_END_LEN); ++ if (sds_tmp == NULL) { ++ flb_plg_debug(ctx->ins, "error concatenating stmt_sql: param end"); ++ return -1; ++ } ++ *stmt_sql = sds_tmp; ++ ++ return 0; ++} ++ + int flb_tail_db_file_set(struct flb_tail_file *file, + struct flb_tail_config *ctx) + { +@@ -275,3 +311,128 @@ int flb_tail_db_file_delete(struct flb_tail_file *file, + flb_plg_debug(ctx->ins, "db: file deleted from database: %s", file->name); + return 0; + } ++ ++/* ++ * Delete stale file from database ++ */ ++int flb_tail_db_stale_file_delete(struct flb_input_instance *ins, ++ struct flb_config *config, ++ struct flb_tail_config *ctx) ++{ ++ int ret = -1; ++ size_t sql_size; ++ uint64_t idx; ++ uint64_t file_count = ctx->files_static_count; ++ flb_sds_t stale_delete_sql; ++ flb_sds_t sds_tmp; ++ sqlite3_stmt *stmt_delete_inodes = NULL; ++ struct mk_list *tmp; ++ struct mk_list *head; ++ struct flb_tail_file *file; ++ ++ if (!ctx->db) { ++ return 0; ++ } ++ ++ /* Create a stmt sql buffer */ ++ sql_size = SQL_DELETE_STALE_FILE_START_LEN; ++ sql_size += SQL_DELETE_STALE_FILE_WHERE_LEN; ++ sql_size += SQL_STMT_START_PARAM_LEN; ++ sql_size += SQL_STMT_PARAM_END_LEN; ++ sql_size += SQL_STMT_END_LEN; ++ if (file_count > 0) { ++ sql_size += (SQL_STMT_ADD_PARAM_LEN * file_count); ++ } ++ ++ stale_delete_sql = flb_sds_create_size(sql_size + 1); ++ if (!stale_delete_sql) { ++ flb_plg_error(ctx->ins, "cannot allocate buffer for stale_delete_sql:" ++ " size: %zu", sql_size); ++ return -1; ++ } ++ ++ /* Create a stmt sql */ ++ sds_tmp = flb_sds_cat(stale_delete_sql, SQL_DELETE_STALE_FILE_START, ++ SQL_DELETE_STALE_FILE_START_LEN); ++ if (sds_tmp == NULL) { ++ flb_plg_error(ctx->ins, ++ "error concatenating stale_delete_sql: start"); ++ flb_sds_destroy(stale_delete_sql); ++ return -1; ++ } ++ stale_delete_sql = sds_tmp; ++ ++ if (file_count > 0) { ++ sds_tmp = flb_sds_cat(stale_delete_sql, SQL_DELETE_STALE_FILE_WHERE, ++ SQL_DELETE_STALE_FILE_WHERE_LEN); ++ if (sds_tmp == NULL) { ++ flb_plg_error(ctx->ins, ++ "error concatenating stale_delete_sql: where"); ++ flb_sds_destroy(stale_delete_sql); ++ return -1; ++ } ++ stale_delete_sql = sds_tmp; ++ ++ ret = stmt_add_param_concat(ctx, &stale_delete_sql, file_count); ++ if (ret == -1) { ++ flb_plg_error(ctx->ins, ++ "error concatenating stale_delete_sql: param"); ++ flb_sds_destroy(stale_delete_sql); ++ return -1; ++ } ++ } ++ ++ sds_tmp = flb_sds_cat(stale_delete_sql, SQL_STMT_END, SQL_STMT_END_LEN); ++ if (sds_tmp == NULL) { ++ flb_plg_error(ctx->ins, ++ "error concatenating stale_delete_sql: end"); ++ flb_sds_destroy(stale_delete_sql); ++ return -1; ++ } ++ stale_delete_sql = sds_tmp; ++ ++ /* Prepare stmt */ ++ ret = sqlite3_prepare_v2(ctx->db->handler, stale_delete_sql, -1, ++ &stmt_delete_inodes, 0); ++ if (ret != SQLITE_OK) { ++ flb_plg_error(ctx->ins, "error preparing database SQL statement:" ++ " stmt_delete_inodes sql:%s, ret=%d", stale_delete_sql, ++ ret); ++ flb_sds_destroy(stale_delete_sql); ++ return -1; ++ } ++ ++ /* Bind parameters */ ++ idx = 1; ++ mk_list_foreach_safe(head, tmp, &ctx->files_static) { ++ file = mk_list_entry(head, struct flb_tail_file, _head); ++ ret = sqlite3_bind_int64(stmt_delete_inodes, idx, file->inode); ++ if (ret != SQLITE_OK) { ++ flb_plg_error(ctx->ins, "error binding to stmt_delete_inodes:" ++ " inode=%lu, ret=%d", file->inode, ret); ++ sqlite3_finalize(stmt_delete_inodes); ++ flb_sds_destroy(stale_delete_sql); ++ return -1; ++ } ++ idx++; ++ } ++ ++ /* Run the delete inodes */ ++ ret = sqlite3_step(stmt_delete_inodes); ++ if (ret != SQLITE_DONE) { ++ sqlite3_finalize(stmt_delete_inodes); ++ flb_sds_destroy(stale_delete_sql); ++ flb_plg_error(ctx->ins, "cannot execute delete stale inodes: ret=%d", ++ ret); ++ return -1; ++ } ++ ++ ret = sqlite3_changes(ctx->db->handler); ++ flb_plg_info(ctx->ins, "db: delete unmonitored stale inodes from the" ++ " database: count=%d", ret); ++ ++ sqlite3_finalize(stmt_delete_inodes); ++ flb_sds_destroy(stale_delete_sql); ++ ++ return 0; ++} +diff --git a/plugins/in_tail/tail_db.h b/plugins/in_tail/tail_db.h +index 7b5355d229c..b1fde721d29 100644 +--- a/plugins/in_tail/tail_db.h ++++ b/plugins/in_tail/tail_db.h +@@ -40,4 +40,7 @@ int flb_tail_db_file_rotate(const char *new_name, + struct flb_tail_config *ctx); + int flb_tail_db_file_delete(struct flb_tail_file *file, + struct flb_tail_config *ctx); ++int flb_tail_db_stale_file_delete(struct flb_input_instance *ins, ++ struct flb_config *config, ++ struct flb_tail_config *ctx); + #endif +diff --git a/plugins/in_tail/tail_sql.h b/plugins/in_tail/tail_sql.h +index 855933a0149..bf724f318cd 100644 +--- a/plugins/in_tail/tail_sql.h ++++ b/plugins/in_tail/tail_sql.h +@@ -53,6 +53,28 @@ + #define SQL_DELETE_FILE \ + "DELETE FROM in_tail_files WHERE id=@id;" + ++#define SQL_STMT_START_PARAM "(?" ++#define SQL_STMT_START_PARAM_LEN (sizeof(SQL_STMT_START_PARAM) - 1) ++ ++#define SQL_STMT_ADD_PARAM ",?" ++#define SQL_STMT_ADD_PARAM_LEN (sizeof(SQL_STMT_ADD_PARAM) - 1) ++ ++#define SQL_STMT_PARAM_END ")" ++#define SQL_STMT_PARAM_END_LEN (sizeof(SQL_STMT_PARAM_END) - 1) ++ ++#define SQL_STMT_END ";" ++#define SQL_STMT_END_LEN (sizeof(SQL_STMT_END) - 1) ++ ++#define SQL_DELETE_STALE_FILE_START \ ++ "DELETE FROM in_tail_files " ++#define SQL_DELETE_STALE_FILE_START_LEN \ ++ (sizeof(SQL_DELETE_STALE_FILE_START) - 1) ++ ++#define SQL_DELETE_STALE_FILE_WHERE \ ++ "WHERE inode NOT IN " ++#define SQL_DELETE_STALE_FILE_WHERE_LEN \ ++ (sizeof(SQL_DELETE_STALE_FILE_WHERE) - 1) ++ + #define SQL_PRAGMA_SYNC \ + "PRAGMA synchronous=%i;" + +diff --git a/tests/runtime/in_tail.c b/tests/runtime/in_tail.c +index ee5fba88744..74accb66ed6 100644 +--- a/tests/runtime/in_tail.c ++++ b/tests/runtime/in_tail.c +@@ -1545,6 +1545,194 @@ void flb_test_db() + test_tail_ctx_destroy(ctx); + unlink(db); + } ++ ++void flb_test_db_delete_stale_file() ++{ ++ struct flb_lib_out_cb cb_data; ++ struct test_tail_ctx *ctx; ++ char *org_file[] = {"test_db.log", "test_db_stale.log"}; ++ char *tmp_file[] = {"test_db.log"}; ++ char *path = "test_db.log, test_db_stale.log"; ++ char *move_file[] = {"test_db_stale.log", "test_db_stale_new.log"}; ++ char *new_file[] = {"test_db.log", "test_db_stale_new.log"}; ++ char *new_path = "test_db.log, test_db_stale_new.log"; ++ char *db = "test_db.db"; ++ char *msg_init = "hello world"; ++ char *msg_end = "hello db end"; ++ int i; ++ int ret; ++ int num; ++ int unused; ++ ++ unlink(db); ++ ++ clear_output_num(); ++ ++ cb_data.cb = cb_count_msgpack; ++ cb_data.data = &unused; ++ ++ ctx = test_tail_ctx_create(&cb_data, ++ &org_file[0], ++ sizeof(org_file)/sizeof(char *), ++ FLB_FALSE); ++ if (!TEST_CHECK(ctx != NULL)) { ++ TEST_MSG("test_ctx_create failed"); ++ exit(EXIT_FAILURE); ++ } ++ ++ ret = flb_input_set(ctx->flb, ctx->o_ffd, ++ "path", path, ++ "read_from_head", "true", ++ "db", db, ++ "db.sync", "full", ++ NULL); ++ TEST_CHECK(ret == 0); ++ ++ ret = flb_output_set(ctx->flb, ctx->o_ffd, ++ NULL); ++ TEST_CHECK(ret == 0); ++ ++ /* Start the engine */ ++ ret = flb_start(ctx->flb); ++ TEST_CHECK(ret == 0); ++ ++ ret = write_msg(ctx, msg_init, strlen(msg_init)); ++ if (!TEST_CHECK(ret > 0)) { ++ test_tail_ctx_destroy(ctx); ++ unlink(db); ++ exit(EXIT_FAILURE); ++ } ++ ++ /* waiting to flush */ ++ flb_time_msleep(500); ++ ++ num = get_output_num(); ++ if (!TEST_CHECK(num > 0)) { ++ TEST_MSG("no output"); ++ } ++ ++ if (ctx->fds != NULL) { ++ for (i=0; ifd_num; i++) { ++ close(ctx->fds[i]); ++ } ++ flb_free(ctx->fds); ++ } ++ flb_stop(ctx->flb); ++ flb_destroy(ctx->flb); ++ flb_free(ctx); ++ ++ /* re-init to use db */ ++ clear_output_num(); ++ ++ /* ++ * Changing the file name from 'test_db_stale.log' to ++ * 'test_db_stale_new.log.' In this scenario, it is assumed that the ++ * file was deleted after the FluentBit was terminated. However, since ++ * the FluentBit was shutdown, the inode remains in the database. ++ * The reason for renaming is to preserve the existing file for later use. ++ */ ++ ret = rename(move_file[0], move_file[1]); ++ TEST_CHECK(ret == 0); ++ ++ cb_data.cb = cb_count_msgpack; ++ cb_data.data = &unused; ++ ++ ctx = test_tail_ctx_create(&cb_data, ++ &tmp_file[0], ++ sizeof(tmp_file)/sizeof(char *), ++ FLB_FALSE); ++ if (!TEST_CHECK(ctx != NULL)) { ++ TEST_MSG("test_ctx_create failed"); ++ unlink(db); ++ exit(EXIT_FAILURE); ++ } ++ ++ ret = flb_input_set(ctx->flb, ctx->o_ffd, ++ "path", path, ++ "read_from_head", "true", ++ "db", db, ++ "db.sync", "full", ++ NULL); ++ TEST_CHECK(ret == 0); ++ ++ /* ++ * Start the engine ++ * FluentBit will delete stale inodes. ++ */ ++ ret = flb_start(ctx->flb); ++ TEST_CHECK(ret == 0); ++ ++ /* waiting to flush */ ++ flb_time_msleep(500); ++ ++ if (ctx->fds != NULL) { ++ for (i=0; ifd_num; i++) { ++ close(ctx->fds[i]); ++ } ++ flb_free(ctx->fds); ++ } ++ flb_stop(ctx->flb); ++ flb_destroy(ctx->flb); ++ flb_free(ctx); ++ ++ /* re-init to use db */ ++ clear_output_num(); ++ ++ cb_data.cb = cb_count_msgpack; ++ cb_data.data = &unused; ++ ++ ctx = test_tail_ctx_create(&cb_data, ++ &new_file[0], ++ sizeof(new_file)/sizeof(char *), ++ FLB_FALSE); ++ if (!TEST_CHECK(ctx != NULL)) { ++ TEST_MSG("test_ctx_create failed"); ++ unlink(db); ++ exit(EXIT_FAILURE); ++ } ++ ++ ret = flb_input_set(ctx->flb, ctx->o_ffd, ++ "path", new_path, ++ "read_from_head", "true", ++ "db", db, ++ "db.sync", "full", ++ NULL); ++ TEST_CHECK(ret == 0); ++ ++ /* ++ * Start the engine ++ * 'test_db_stale_new.log.' is a new file. ++ * The inode of 'test_db_stale.log' was deleted previously. ++ * So, it reads from the beginning of the file. ++ */ ++ ret = flb_start(ctx->flb); ++ TEST_CHECK(ret == 0); ++ ++ /* waiting to flush */ ++ flb_time_msleep(500); ++ ++ ret = write_msg(ctx, msg_end, strlen(msg_end)); ++ if (!TEST_CHECK(ret > 0)) { ++ test_tail_ctx_destroy(ctx); ++ unlink(db); ++ exit(EXIT_FAILURE); ++ } ++ ++ /* waiting to flush */ ++ flb_time_msleep(500); ++ ++ num = get_output_num(); ++ if (!TEST_CHECK(num == 3)) { ++ /* 3 = ++ * test_db.log : "hello db end" ++ * test_db_stale.log : "msg_init" + "hello db end" ++ */ ++ TEST_MSG("num error. expect=3 got=%d", num); ++ } ++ ++ test_tail_ctx_destroy(ctx); ++ unlink(db); ++} + #endif /* FLB_HAVE_SQLDB */ + + /* Test list */ +@@ -1569,6 +1757,7 @@ TEST_LIST = { + + #ifdef FLB_HAVE_SQLDB + {"db", flb_test_db}, ++ {"db_delete_stale_file", flb_test_db_delete_stale_file}, + #endif + + #ifdef in_tail + +From d06114cbb1419ef9e8969b897730de07b64cfe28 Mon Sep 17 00:00:00 2001 +From: "jinyong.choi" +Date: Thu, 19 Oct 2023 00:37:36 +0900 +Subject: [PATCH 2/2] in_tail: Introducing the compare_filename option to + db_file_exists (#8025)(2/2) + +When checking the existence of a file's inode, if the 'compare_filename' +option is enabled, it is modified to compare the filename as well. +If the inode matches but the filename is different, it removes the stale +inode from the database. + +Signed-off-by: jinyong.choi +--- + plugins/in_tail/tail.c | 8 ++ + plugins/in_tail/tail_config.h | 1 + + plugins/in_tail/tail_db.c | 58 ++++++++++++- + tests/runtime/in_tail.c | 148 ++++++++++++++++++++++++++++++++++ + 4 files changed, 213 insertions(+), 2 deletions(-) + +diff --git a/plugins/in_tail/tail.c b/plugins/in_tail/tail.c +index 37b1f4f6c68..52bf2ed6d40 100644 +--- a/plugins/in_tail/tail.c ++++ b/plugins/in_tail/tail.c +@@ -734,6 +734,14 @@ static struct flb_config_map config_map[] = { + "provides higher performance. Note that WAL is not compatible with " + "shared network file systems." + }, ++ { ++ FLB_CONFIG_MAP_BOOL, "db.compare_filename", "false", ++ 0, FLB_TRUE, offsetof(struct flb_tail_config, compare_filename), ++ "This option determines whether to check both the inode and the filename " ++ "when retrieving file information from the db." ++ "'true' verifies both the inode and filename, while 'false' checks only " ++ "the inode (default)." ++ }, + #endif + + /* Multiline Options */ +diff --git a/plugins/in_tail/tail_config.h b/plugins/in_tail/tail_config.h +index dcfa54e0264..c0263b46503 100644 +--- a/plugins/in_tail/tail_config.h ++++ b/plugins/in_tail/tail_config.h +@@ -107,6 +107,7 @@ struct flb_tail_config { + struct flb_sqldb *db; + int db_sync; + int db_locking; ++ int compare_filename; + flb_sds_t db_journal_mode; + sqlite3_stmt *stmt_get_file; + sqlite3_stmt *stmt_insert_file; +diff --git a/plugins/in_tail/tail_db.c b/plugins/in_tail/tail_db.c +index 99242f8a15b..6f535ea646b 100644 +--- a/plugins/in_tail/tail_db.c ++++ b/plugins/in_tail/tail_db.c +@@ -95,9 +95,38 @@ int flb_tail_db_close(struct flb_sqldb *db) + return 0; + } + ++static int flb_tail_db_file_delete_by_id(struct flb_tail_config *ctx, ++ uint64_t id) ++{ ++ int ret; ++ ++ /* Bind parameters */ ++ ret = sqlite3_bind_int64(ctx->stmt_delete_file, 1, id); ++ if (ret != SQLITE_OK) { ++ flb_plg_error(ctx->ins, "db: error binding id=%"PRIu64", ret=%d", id, ret); ++ return -1; ++ } ++ ++ ret = sqlite3_step(ctx->stmt_delete_file); ++ ++ sqlite3_clear_bindings(ctx->stmt_delete_file); ++ sqlite3_reset(ctx->stmt_delete_file); ++ ++ if (ret != SQLITE_DONE) { ++ flb_plg_error(ctx->ins, "db: error deleting stale entry from database:" ++ " id=%"PRIu64, id); ++ return -1; ++ } ++ ++ flb_plg_info(ctx->ins, "db: stale file deleted from database:" ++ " id=%"PRIu64, id); ++ return 0; ++} ++ + /* +- * Check if an file inode exists in the database. Return FLB_TRUE or +- * FLB_FALSE ++ * Check if an file inode exists in the database. ++ * If the 'compare_filename' option is enabled, ++ * it checks along with the filename. Return FLB_TRUE or FLB_FALSE + */ + static int db_file_exists(struct flb_tail_file *file, + struct flb_tail_config *ctx, +@@ -105,6 +134,7 @@ static int db_file_exists(struct flb_tail_file *file, + { + int ret; + int exists = FLB_FALSE; ++ const unsigned char *name; + + /* Bind parameters */ + sqlite3_bind_int64(ctx->stmt_get_file, 1, file->inode); +@@ -116,11 +146,30 @@ static int db_file_exists(struct flb_tail_file *file, + /* id: column 0 */ + *id = sqlite3_column_int64(ctx->stmt_get_file, 0); + ++ /* name: column 1 */ ++ name = sqlite3_column_text(ctx->stmt_get_file, 1); ++ if (ctx->compare_filename && name == NULL) { ++ flb_plg_error(ctx->ins, "db: error getting name: id=%"PRIu64, *id); ++ return -1; ++ } ++ + /* offset: column 2 */ + *offset = sqlite3_column_int64(ctx->stmt_get_file, 2); + + /* inode: column 3 */ + *inode = sqlite3_column_int64(ctx->stmt_get_file, 3); ++ ++ /* Checking if the file's name and inode match exactly */ ++ if (ctx->compare_filename) { ++ if (flb_tail_target_file_name_cmp((char *) name, file) != 0) { ++ exists = FLB_FALSE; ++ flb_plg_debug(ctx->ins, "db: exists stale file from database:" ++ " id=%"PRIu64" inode=%"PRIu64" offset=%"PRIu64 ++ " name=%s file_inode=%"PRIu64" file_name=%s", ++ *id, *inode, *offset, name, file->inode, ++ file->name); ++ } ++ } + } + else if (ret == SQLITE_DONE) { + /* all good */ +@@ -221,6 +270,11 @@ int flb_tail_db_file_set(struct flb_tail_file *file, + } + + if (ret == FLB_FALSE) { ++ /* Delete stale file of same inode */ ++ if (ctx->compare_filename && id > 0) { ++ flb_tail_db_file_delete_by_id(ctx, id); ++ } ++ + /* Get the database ID for this file */ + file->db_id = db_file_insert(file, ctx); + } +diff --git a/tests/runtime/in_tail.c b/tests/runtime/in_tail.c +index 74accb66ed6..90d8832bc79 100644 +--- a/tests/runtime/in_tail.c ++++ b/tests/runtime/in_tail.c +@@ -1733,6 +1733,153 @@ void flb_test_db_delete_stale_file() + test_tail_ctx_destroy(ctx); + unlink(db); + } ++ ++void flb_test_db_compare_filename() ++{ ++ struct flb_lib_out_cb cb_data; ++ struct test_tail_ctx *ctx; ++ char *org_file[] = {"test_db.log"}; ++ char *moved_file[] = {"test_db_moved.log"}; ++ char *db = "test_db.db"; ++ char *msg_init = "hello world"; ++ char *msg_moved = "hello world moved"; ++ char *msg_end = "hello db end"; ++ int i; ++ int ret; ++ int num; ++ int unused; ++ ++ unlink(db); ++ ++ clear_output_num(); ++ ++ cb_data.cb = cb_count_msgpack; ++ cb_data.data = &unused; ++ ++ ctx = test_tail_ctx_create(&cb_data, ++ &org_file[0], ++ sizeof(org_file)/sizeof(char *), ++ FLB_FALSE); ++ if (!TEST_CHECK(ctx != NULL)) { ++ TEST_MSG("test_ctx_create failed"); ++ exit(EXIT_FAILURE); ++ } ++ ++ ret = flb_input_set(ctx->flb, ctx->o_ffd, ++ "path", org_file[0], ++ "read_from_head", "true", ++ "db", db, ++ "db.sync", "full", ++ "db.compare_filename", "true", ++ NULL); ++ TEST_CHECK(ret == 0); ++ ++ ret = flb_output_set(ctx->flb, ctx->o_ffd, ++ NULL); ++ TEST_CHECK(ret == 0); ++ ++ /* Start the engine */ ++ ret = flb_start(ctx->flb); ++ TEST_CHECK(ret == 0); ++ ++ ret = write_msg(ctx, msg_init, strlen(msg_init)); ++ if (!TEST_CHECK(ret > 0)) { ++ test_tail_ctx_destroy(ctx); ++ unlink(db); ++ exit(EXIT_FAILURE); ++ } ++ ++ /* waiting to flush */ ++ flb_time_msleep(500); ++ ++ num = get_output_num(); ++ if (!TEST_CHECK(num > 0)) { ++ TEST_MSG("no output"); ++ } ++ ++ if (ctx->fds != NULL) { ++ for (i=0; ifd_num; i++) { ++ close(ctx->fds[i]); ++ } ++ flb_free(ctx->fds); ++ } ++ flb_stop(ctx->flb); ++ flb_destroy(ctx->flb); ++ flb_free(ctx); ++ ++ /* re-init to use db */ ++ clear_output_num(); ++ ++ /* ++ * Changing the file name from 'test_db.log' to 'test_db_moved.log.' ++ * In this scenario, it is assumed that the FluentBit has been terminated, ++ * and the file has been recreated with the same inode, with offsets equal ++ * to or greater than the previous file. ++ */ ++ ret = rename(org_file[0], moved_file[0]); ++ TEST_CHECK(ret == 0); ++ ++ cb_data.cb = cb_count_msgpack; ++ cb_data.data = &unused; ++ ++ ctx = test_tail_ctx_create(&cb_data, ++ &moved_file[0], ++ sizeof(moved_file)/sizeof(char *), ++ FLB_FALSE); ++ if (!TEST_CHECK(ctx != NULL)) { ++ TEST_MSG("test_ctx_create failed"); ++ unlink(db); ++ exit(EXIT_FAILURE); ++ } ++ ++ ret = flb_input_set(ctx->flb, ctx->o_ffd, ++ "path", moved_file[0], ++ "read_from_head", "true", ++ "db", db, ++ "db.sync", "full", ++ "db.compare_filename", "true", ++ NULL); ++ TEST_CHECK(ret == 0); ++ ++ /* ++ * Start the engine ++ * The file has been newly created, and due to the 'db.compare_filename' ++ * option being set to true, it compares filenames to consider it a new ++ * file even if the inode is the same. If the option is set to false, ++ * it can be assumed to be the same file as before. ++ */ ++ ret = flb_start(ctx->flb); ++ TEST_CHECK(ret == 0); ++ ++ /* waiting to flush */ ++ flb_time_msleep(500); ++ ++ ret = write_msg(ctx, msg_moved, strlen(msg_moved)); ++ if (!TEST_CHECK(ret > 0)) { ++ test_tail_ctx_destroy(ctx); ++ unlink(db); ++ exit(EXIT_FAILURE); ++ } ++ ++ ret = write_msg(ctx, msg_end, strlen(msg_end)); ++ if (!TEST_CHECK(ret > 0)) { ++ test_tail_ctx_destroy(ctx); ++ unlink(db); ++ exit(EXIT_FAILURE); ++ } ++ ++ /* waiting to flush */ ++ flb_time_msleep(500); ++ ++ num = get_output_num(); ++ if (!TEST_CHECK(num == 3)) { ++ /* 3 = msg_init + msg_moved + msg_end */ ++ TEST_MSG("num error. expect=3 got=%d", num); ++ } ++ ++ test_tail_ctx_destroy(ctx); ++ unlink(db); ++} + #endif /* FLB_HAVE_SQLDB */ + + /* Test list */ +@@ -1758,6 +1905,7 @@ TEST_LIST = { + #ifdef FLB_HAVE_SQLDB + {"db", flb_test_db}, + {"db_delete_stale_file", flb_test_db_delete_stale_file}, ++ {"db_compare_filename", flb_test_db_compare_filename}, + #endif + + #ifdef in_tail diff --git a/SPECS/fluent-bit/fluent-bit.spec b/SPECS/fluent-bit/fluent-bit.spec index 51d5cf5e4d3..4501fe749d8 100644 --- a/SPECS/fluent-bit/fluent-bit.spec +++ b/SPECS/fluent-bit/fluent-bit.spec @@ -8,6 +8,7 @@ Distribution: Mariner URL: https://fluentbit.io Source0: https://github.com/fluent/%{name}/archive/refs/tags/v%{version}.tar.gz#/%{name}-%{version}.tar.gz Patch0: in_emitter_fix_issue_8198.patch +Patch1: fix_issue_8025.patch BuildRequires: bison BuildRequires: cmake BuildRequires: cyrus-sasl-devel @@ -83,6 +84,7 @@ Development files for %{name} %changelog * Thu Apr 16 2024 Sindhu Karri - 2.2.2-2 - Apply patch in_emitter_fix_issue_8198.patch to fix #8198 ( Potential log loss during high load at Multiline & Rewrite Tag Filter (in_emitter) ) +- Fix issue #8025 with a patch ( in_tail: missing log for offset processing due to non-existent old inodes in sqlite ) * Wed Apr 03 2024 CBL-Mariner Servicing Account - 2.2.2-1 - Auto-upgrade to 2.2.2 - CVE-2024-23722