Skip to content

Commit

Permalink
filter_multiline: Add support for processors
Browse files Browse the repository at this point in the history
This commit adds the support to re-emit the processed multiline records back into the processor pipeline

Signed-off-by: Richard Treu <[email protected]>
  • Loading branch information
drbugfinder-work authored and edsiper committed Nov 4, 2024
1 parent 2657b81 commit 5473475
Showing 1 changed file with 49 additions and 2 deletions.
51 changes: 49 additions & 2 deletions plugins/filter_multiline/ml.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
* limitations under the License.
*/

#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_plugin.h>
#include <fluent-bit/flb_processor.h>
#include <fluent-bit/flb_filter_plugin.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_time.h>
Expand Down Expand Up @@ -148,6 +151,36 @@ static int multiline_load_parsers(struct ml_ctx *ctx)
return 0;
}

static int ingest_inline(struct ml_ctx *ctx,
flb_sds_t out_tag,
const void *buf, size_t buf_size)
{
struct flb_input_instance *input_instance;
struct flb_processor_unit *processor_unit;
struct flb_processor *processor;
int result;
if (ctx->ins->parent_processor != NULL) {
processor_unit = (struct flb_processor_unit *) \
ctx->ins->parent_processor;
processor = (struct flb_processor *) processor_unit->parent;
input_instance = (struct flb_input_instance *) processor->data;

if (processor->source_plugin_type == FLB_PLUGIN_INPUT) {
result = flb_input_log_append_skip_processor_stages(
input_instance,
processor_unit->stage + 1,
out_tag, flb_sds_len(out_tag),
buf, buf_size);

if (result == 0) {
return FLB_TRUE;
}
}
}

return FLB_FALSE;
}

static int flush_callback(struct flb_ml_parser *parser,
struct flb_ml_stream *mst,
void *data, char *buf_data, size_t buf_size)
Expand Down Expand Up @@ -175,8 +208,14 @@ static int flush_callback(struct flb_ml_parser *parser,

/* Emit record with original tag */
flb_plg_trace(ctx->ins, "emitting from %s to %s", stream->input_name, stream->tag);
ret = in_emitter_add_record(stream->tag, flb_sds_len(stream->tag), buf_data, buf_size,
ret = ingest_inline(ctx, stream->tag, buf_data, buf_size);
if (!ret) {
ret = in_emitter_add_record(stream->tag, flb_sds_len(stream->tag), buf_data, buf_size,
ctx->ins_emitter, ctx->i_ins);
}
else {
ret = 0;
}

return ret;
}
Expand Down Expand Up @@ -525,11 +564,19 @@ static void partial_timer_cb(struct flb_config *config, void *data)
packer->log_encoder.output_length > 0) {

flb_plg_trace(ctx->ins, "emitting from %s to %s", packer->input_name, packer->tag);
ret = in_emitter_add_record(packer->tag, flb_sds_len(packer->tag),

ret = ingest_inline(ctx, packer->tag, packer->log_encoder.output_buffer,
packer->log_encoder.output_length);
if (!ret) {
ret = in_emitter_add_record(packer->tag, flb_sds_len(packer->tag),
packer->log_encoder.output_buffer,
packer->log_encoder.output_length,
ctx->ins_emitter,
ctx->i_ins);
}
else {
ret = 0;
}
if (ret < 0) {
/* this shouldn't happen in normal execution */
flb_plg_warn(ctx->ins,
Expand Down

0 comments on commit 5473475

Please sign in to comment.