From cad28cb950e693eae4b642bbe23da7ac4e054758 Mon Sep 17 00:00:00 2001 From: Leonardo Alminana Date: Wed, 18 Dec 2024 21:52:42 +0100 Subject: [PATCH] in_opentelemetry: added proper profile event ingestion support Signed-off-by: Leonardo Alminana --- plugins/in_opentelemetry/opentelemetry_prot.c | 85 ++----------------- 1 file changed, 9 insertions(+), 76 deletions(-) diff --git a/plugins/in_opentelemetry/opentelemetry_prot.c b/plugins/in_opentelemetry/opentelemetry_prot.c index a99ca12fb89..a471c52f996 100644 --- a/plugins/in_opentelemetry/opentelemetry_prot.c +++ b/plugins/in_opentelemetry/opentelemetry_prot.c @@ -2675,36 +2675,22 @@ static int process_payload_profiles_ng(struct flb_opentelemetry *ctx, struct flb_http_request *request, struct flb_http_response *response) { - cfl_sds_t text_encoded_profiles_context; - struct cprof *profiles_context; - struct flb_log_event_encoder *encoder; - size_t offset; - int ret; - - encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2); - - if (encoder == NULL) { - return -1; - } + struct cprof *profiles_context; + size_t offset; + int ret; if (request->content_type == NULL) { - flb_log_event_encoder_destroy(encoder); - flb_error("[otel] content type missing"); return -1; } else if (strcasecmp(request->content_type, "application/json") == 0) { - flb_log_event_encoder_destroy(encoder); - flb_error("[otel] unsuported profiles encoding type : %s", request->content_type); return -1; } else if (strcasecmp(request->content_type, "application/x-protobuf") == 0) { - flb_log_event_encoder_destroy(encoder); - flb_error("[otel] unsuported profiles encoding type : %s", request->content_type); @@ -2712,8 +2698,6 @@ static int process_payload_profiles_ng(struct flb_opentelemetry *ctx, } else if (strcasecmp(request->content_type, "application/grpc") == 0) { if (cfl_sds_len(request->body) < 5) { - flb_log_event_encoder_destroy(encoder); - flb_error("[otel] malformed grpc packet of size %zu", cfl_sds_len(request->body)); @@ -2729,69 +2713,21 @@ static int process_payload_profiles_ng(struct flb_opentelemetry *ctx, &offset); if (ret != CPROF_DECODE_OPENTELEMETRY_SUCCESS) { - flb_log_event_encoder_destroy(encoder); - flb_error("[otel] profile decoding error : %d", ret); return -1; } - ret = cprof_encode_text_create(&text_encoded_profiles_context, profiles_context); + ret = flb_input_profiles_append(ctx->ins, + tag, + flb_sds_len(tag), + profiles_context); cprof_decode_opentelemetry_destroy(profiles_context); - if (ret != CPROF_DECODE_OPENTELEMETRY_SUCCESS) { - flb_log_event_encoder_destroy(encoder); - - flb_error("[otel] profile text encoding error : %d", - ret); - - return -1; - } - - flb_log_event_encoder_begin_record(encoder); - - flb_log_event_encoder_set_current_timestamp(encoder); - - ret = flb_log_event_encoder_append_body_values( - encoder, - FLB_LOG_EVENT_CSTRING_VALUE("Profile"), - FLB_LOG_EVENT_STRING_VALUE(text_encoded_profiles_context, - cfl_sds_len(text_encoded_profiles_context))); - - cprof_encode_text_destroy(text_encoded_profiles_context); - - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_log_event_encoder_destroy(encoder); - - flb_error("[otel] re encoded profile ingestion error : %d", - ret); - - return -1; - } - - ret = flb_log_event_encoder_commit_record(encoder); - - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_log_event_encoder_destroy(encoder); - - flb_error("[otel] re encoded profile ingestion error : %d", - ret); - - return -1; - } - - ret = flb_input_log_append(ctx->ins, - tag, - flb_sds_len(tag), - encoder->output_buffer, - encoder->output_length); - - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_log_event_encoder_destroy(encoder); - - flb_error("[otel] re encoded profile ingestion error : %d", + if (ret != 0) { + flb_error("[otel] profile ingestion error : %d", ret); return -1; @@ -2805,9 +2741,6 @@ static int process_payload_profiles_ng(struct flb_opentelemetry *ctx, ret = -1; } - - flb_log_event_encoder_destroy(encoder); - return ret; }