Skip to content

Commit

Permalink
in_opentelemetry: added proper profile event ingestion support
Browse files Browse the repository at this point in the history
Signed-off-by: Leonardo Alminana <[email protected]>
  • Loading branch information
leonardo-albertovich committed Dec 18, 2024
1 parent f8fb651 commit cad28cb
Showing 1 changed file with 9 additions and 76 deletions.
85 changes: 9 additions & 76 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -2675,45 +2675,29 @@ static int process_payload_profiles_ng(struct flb_opentelemetry *ctx,
struct flb_http_request *request,
struct flb_http_response *response)
{
cfl_sds_t text_encoded_profiles_context;
struct cprof *profiles_context;
struct flb_log_event_encoder *encoder;
size_t offset;
int ret;

encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2);

if (encoder == NULL) {
return -1;
}
struct cprof *profiles_context;
size_t offset;
int ret;

if (request->content_type == NULL) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] content type missing");

return -1;
}
else if (strcasecmp(request->content_type, "application/json") == 0) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] unsuported profiles encoding type : %s",
request->content_type);

return -1;
}
else if (strcasecmp(request->content_type, "application/x-protobuf") == 0) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] unsuported profiles encoding type : %s",
request->content_type);

return -1;
}
else if (strcasecmp(request->content_type, "application/grpc") == 0) {
if (cfl_sds_len(request->body) < 5) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] malformed grpc packet of size %zu",
cfl_sds_len(request->body));

Expand All @@ -2729,69 +2713,21 @@ static int process_payload_profiles_ng(struct flb_opentelemetry *ctx,
&offset);

if (ret != CPROF_DECODE_OPENTELEMETRY_SUCCESS) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] profile decoding error : %d",
ret);

return -1;
}

ret = cprof_encode_text_create(&text_encoded_profiles_context, profiles_context);
ret = flb_input_profiles_append(ctx->ins,
tag,
flb_sds_len(tag),
profiles_context);

cprof_decode_opentelemetry_destroy(profiles_context);

if (ret != CPROF_DECODE_OPENTELEMETRY_SUCCESS) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] profile text encoding error : %d",
ret);

return -1;
}

flb_log_event_encoder_begin_record(encoder);

flb_log_event_encoder_set_current_timestamp(encoder);

ret = flb_log_event_encoder_append_body_values(
encoder,
FLB_LOG_EVENT_CSTRING_VALUE("Profile"),
FLB_LOG_EVENT_STRING_VALUE(text_encoded_profiles_context,
cfl_sds_len(text_encoded_profiles_context)));

cprof_encode_text_destroy(text_encoded_profiles_context);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] re encoded profile ingestion error : %d",
ret);

return -1;
}

ret = flb_log_event_encoder_commit_record(encoder);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] re encoded profile ingestion error : %d",
ret);

return -1;
}

ret = flb_input_log_append(ctx->ins,
tag,
flb_sds_len(tag),
encoder->output_buffer,
encoder->output_length);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] re encoded profile ingestion error : %d",
if (ret != 0) {
flb_error("[otel] profile ingestion error : %d",
ret);

return -1;
Expand All @@ -2805,9 +2741,6 @@ static int process_payload_profiles_ng(struct flb_opentelemetry *ctx,
ret = -1;
}


flb_log_event_encoder_destroy(encoder);

return ret;
}

Expand Down

0 comments on commit cad28cb

Please sign in to comment.