From 54734754989fb3e5b0c3d38bb57943178a81df14 Mon Sep 17 00:00:00 2001 From: Richard Treu Date: Wed, 23 Oct 2024 20:24:30 +0200 Subject: [PATCH] filter_multiline: Add support for processors This commit adds the support to re-emit the processed multiline records back into the processor pipeline Signed-off-by: Richard Treu --- plugins/filter_multiline/ml.c | 51 +++++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/plugins/filter_multiline/ml.c b/plugins/filter_multiline/ml.c index 4c5130a1b74..41559296d05 100644 --- a/plugins/filter_multiline/ml.c +++ b/plugins/filter_multiline/ml.c @@ -17,6 +17,9 @@ * limitations under the License. */ +#include +#include +#include #include #include #include @@ -148,6 +151,36 @@ static int multiline_load_parsers(struct ml_ctx *ctx) return 0; } +static int ingest_inline(struct ml_ctx *ctx, + flb_sds_t out_tag, + const void *buf, size_t buf_size) +{ + struct flb_input_instance *input_instance; + struct flb_processor_unit *processor_unit; + struct flb_processor *processor; + int result; + if (ctx->ins->parent_processor != NULL) { + processor_unit = (struct flb_processor_unit *) \ + ctx->ins->parent_processor; + processor = (struct flb_processor *) processor_unit->parent; + input_instance = (struct flb_input_instance *) processor->data; + + if (processor->source_plugin_type == FLB_PLUGIN_INPUT) { + result = flb_input_log_append_skip_processor_stages( + input_instance, + processor_unit->stage + 1, + out_tag, flb_sds_len(out_tag), + buf, buf_size); + + if (result == 0) { + return FLB_TRUE; + } + } + } + + return FLB_FALSE; +} + static int flush_callback(struct flb_ml_parser *parser, struct flb_ml_stream *mst, void *data, char *buf_data, size_t buf_size) @@ -175,8 +208,14 @@ static int flush_callback(struct flb_ml_parser *parser, /* Emit record with original tag */ flb_plg_trace(ctx->ins, "emitting from %s to %s", stream->input_name, stream->tag); - ret = in_emitter_add_record(stream->tag, flb_sds_len(stream->tag), buf_data, buf_size, + ret = ingest_inline(ctx, stream->tag, buf_data, buf_size); + if (!ret) { + ret = in_emitter_add_record(stream->tag, flb_sds_len(stream->tag), buf_data, buf_size, ctx->ins_emitter, ctx->i_ins); + } + else { + ret = 0; + } return ret; } @@ -525,11 +564,19 @@ static void partial_timer_cb(struct flb_config *config, void *data) packer->log_encoder.output_length > 0) { flb_plg_trace(ctx->ins, "emitting from %s to %s", packer->input_name, packer->tag); - ret = in_emitter_add_record(packer->tag, flb_sds_len(packer->tag), + + ret = ingest_inline(ctx, packer->tag, packer->log_encoder.output_buffer, + packer->log_encoder.output_length); + if (!ret) { + ret = in_emitter_add_record(packer->tag, flb_sds_len(packer->tag), packer->log_encoder.output_buffer, packer->log_encoder.output_length, ctx->ins_emitter, ctx->i_ins); + } + else { + ret = 0; + } if (ret < 0) { /* this shouldn't happen in normal execution */ flb_plg_warn(ctx->ins,