diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 7437ca77654..079bfb26d90 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -1376,7 +1376,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, const char *tag, size_t tag_len, const void *buf, size_t buf_size) { - int ret; + int ret, total_records_start; int set_down = FLB_FALSE; int min; int new_chunk = FLB_FALSE; @@ -1495,6 +1495,15 @@ static int input_chunk_append_raw(struct flb_input_instance *in, pre_real_size = flb_input_chunk_get_real_size(ic); } + /* + * Set the total_records based on the records that n_records + * says we should be writing. These values may be overwritten + * flb_filter_do, where a filter may add/remove records. + */ + total_records_start = ic->total_records; + ic->added_records = n_records; + ic->total_records += n_records; + #ifdef FLB_HAVE_CHUNK_TRACE flb_chunk_trace_do_input(ic); #endif /* FLB_HAVE_CHUNK_TRACE */ @@ -1530,9 +1539,13 @@ static int input_chunk_append_raw(struct flb_input_instance *in, flb_free(filtered_data_buffer); } - if (ret == CIO_OK) { - ic->added_records = n_records; - ic->total_records += n_records; + /* + * If the write failed, then we did not add any records. Reset + * the record counters to reflect this. + */ + if (ret != CIO_OK) { + ic->added_records = 0; + ic->total_records = total_records_start; } /* Update 'input' metrics */