Skip to content

Commit

Permalink
input_chunk: handle filter_do edge case (#8229)
Browse files Browse the repository at this point in the history
* input_chunk: handle filter_do edge case

flb_filter_do may modify the input chunk's total records, meaning that
if there is a filter in the pipeline the total records could be
  double-counted which breaks anything that relies on and event chunk's
  `total_records`.

Signed-off-by: braydonk <[email protected]>

* input_chunk: clarify comment

Clarify comment to demonstrate that flb_filter_do actually overwrites
the value not just changing it.

Signed-off-by: braydonk <[email protected]>
  • Loading branch information
braydonk authored Dec 16, 2023
1 parent 2612a1a commit 8850ee1
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size)
{
int ret;
int ret, total_records_start;
int set_down = FLB_FALSE;
int min;
int new_chunk = FLB_FALSE;
Expand Down Expand Up @@ -1495,6 +1495,15 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
pre_real_size = flb_input_chunk_get_real_size(ic);
}

/*
* Set the total_records based on the records that n_records
* says we should be writing. These values may be overwritten
* flb_filter_do, where a filter may add/remove records.
*/
total_records_start = ic->total_records;
ic->added_records = n_records;
ic->total_records += n_records;

#ifdef FLB_HAVE_CHUNK_TRACE
flb_chunk_trace_do_input(ic);
#endif /* FLB_HAVE_CHUNK_TRACE */
Expand Down Expand Up @@ -1530,9 +1539,13 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
flb_free(filtered_data_buffer);
}

if (ret == CIO_OK) {
ic->added_records = n_records;
ic->total_records += n_records;
/*
* If the write failed, then we did not add any records. Reset
* the record counters to reflect this.
*/
if (ret != CIO_OK) {
ic->added_records = 0;
ic->total_records = total_records_start;
}

/* Update 'input' metrics */
Expand Down

0 comments on commit 8850ee1

Please sign in to comment.