From 37cb10d6d9e92569b7d5daede752015c7f5788ed Mon Sep 17 00:00:00 2001 From: braydonk Date: Tue, 28 Nov 2023 21:35:15 +0000 Subject: [PATCH 1/2] input_chunk: handle filter_do edge case flb_filter_do may modify the input chunk's total records, meaning that if there is a filter in the pipeline the total records could be double-counted which breaks anything that relies on and event chunk's `total_records`. Signed-off-by: braydonk --- src/flb_input_chunk.c | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 7437ca77654..92f7dc02138 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -1376,7 +1376,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, const char *tag, size_t tag_len, const void *buf, size_t buf_size) { - int ret; + int ret, total_records_start; int set_down = FLB_FALSE; int min; int new_chunk = FLB_FALSE; @@ -1495,6 +1495,15 @@ static int input_chunk_append_raw(struct flb_input_instance *in, pre_real_size = flb_input_chunk_get_real_size(ic); } + /* + * Set the total_records based on the records that n_records + * says we should be writing. This may be modified by flb_filter_do, + * where a filter may add/remove records. + */ + total_records_start = ic->total_records; + ic->added_records = n_records; + ic->total_records += n_records; + #ifdef FLB_HAVE_CHUNK_TRACE flb_chunk_trace_do_input(ic); #endif /* FLB_HAVE_CHUNK_TRACE */ @@ -1530,9 +1539,13 @@ static int input_chunk_append_raw(struct flb_input_instance *in, flb_free(filtered_data_buffer); } - if (ret == CIO_OK) { - ic->added_records = n_records; - ic->total_records += n_records; + /* + * If the write failed, then we did not add any records. Reset + * the record counters to reflect this. + */ + if (ret != CIO_OK) { + ic->added_records = 0; + ic->total_records = total_records_start; } /* Update 'input' metrics */ From 8543d126d287217567af15d16fdd0e42a56c71a3 Mon Sep 17 00:00:00 2001 From: braydonk Date: Tue, 28 Nov 2023 23:47:26 +0000 Subject: [PATCH 2/2] input_chunk: clarify comment Clarify comment to demonstrate that flb_filter_do actually overwrites the value not just changing it. Signed-off-by: braydonk --- src/flb_input_chunk.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 92f7dc02138..079bfb26d90 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -1497,8 +1497,8 @@ static int input_chunk_append_raw(struct flb_input_instance *in, /* * Set the total_records based on the records that n_records - * says we should be writing. This may be modified by flb_filter_do, - * where a filter may add/remove records. + * says we should be writing. These values may be overwritten + * flb_filter_do, where a filter may add/remove records. */ total_records_start = ic->total_records; ic->added_records = n_records;