diff --git a/plugins/out_opentelemetry/opentelemetry.c b/plugins/out_opentelemetry/opentelemetry.c index 56aa8769bd9..91a4c6f513a 100644 --- a/plugins/out_opentelemetry/opentelemetry.c +++ b/plugins/out_opentelemetry/opentelemetry.c @@ -37,6 +37,10 @@ #include #include +#include +#include +#include + extern cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt); extern void cmt_encode_opentelemetry_destroy(cfl_sds_t text); @@ -643,6 +647,88 @@ static int process_traces(struct flb_event_chunk *event_chunk, return result; } +static int process_profiles(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *ins, void *out_context, + struct flb_config *config) +{ + int ret; + int result; + cfl_sds_t encoded_chunk; + flb_sds_t buf = NULL; + size_t off = 0; + struct cprof *profiles_context; + struct opentelemetry_context *ctx = out_context; + + /* Initialize vars */ + ctx = out_context; + result = FLB_OK; + + buf = flb_sds_create_size(event_chunk->size); + if (!buf) { + flb_plg_error(ctx->ins, "could not allocate outgoing buffer"); + return FLB_RETRY; + } + + flb_plg_debug(ctx->ins, "cprofiles msgpack size: %lu", + event_chunk->size); + + while (cprof_decode_msgpack_create(&profiles_context, + (unsigned char *) event_chunk->data, + event_chunk->size, &off) == 0) { + /* Create a OpenTelemetry payload */ + ret = cprof_encode_opentelemetry_create(&encoded_chunk, profiles_context); + if (ret != CPROF_ENCODE_OPENTELEMETRY_SUCCESS) { + flb_plg_error(ctx->ins, + "Error encoding context as opentelemetry"); + result = FLB_ERROR; + cprof_decode_msgpack_destroy(profiles_context); + goto exit; + } + + /* concat buffer */ + ret = flb_sds_cat_safe(&buf, encoded_chunk, flb_sds_len(encoded_chunk)); + if (ret != 0) { + flb_plg_error(ctx->ins, "Error appending encoded profiles to buffer"); + result = FLB_ERROR; + cprof_encode_opentelemetry_destroy(encoded_chunk); + cprof_decode_msgpack_destroy(profiles_context); + goto exit; + } + + /* release */ + cprof_encode_opentelemetry_destroy(encoded_chunk); + cprof_decode_msgpack_destroy(profiles_context); + } + + flb_plg_debug(ctx->ins, "final payload size: %lu", flb_sds_len(buf)); + if (buf && flb_sds_len(buf) > 0) { + /* Send HTTP request */ + result = opentelemetry_post(ctx, buf, flb_sds_len(buf), + event_chunk->tag, + flb_sds_len(event_chunk->tag), + ctx->profiles_uri_sanitized, + ctx->grpc_profiles_uri); + + /* Debug http_post() result statuses */ + if (result == FLB_OK) { + flb_plg_debug(ctx->ins, "http_post result FLB_OK"); + } + else if (result == FLB_ERROR) { + flb_plg_debug(ctx->ins, "http_post result FLB_ERROR"); + } + else if (result == FLB_RETRY) { + flb_plg_debug(ctx->ins, "http_post result FLB_RETRY"); + } + } + +exit: + if (buf) { + flb_sds_destroy(buf); + } + return result; +} + static int cb_opentelemetry_exit(void *data, struct flb_config *config) { struct opentelemetry_context *ctx; @@ -690,6 +776,9 @@ static void cb_opentelemetry_flush(struct flb_event_chunk *event_chunk, else if (event_chunk->type == FLB_INPUT_TRACES){ result = process_traces(event_chunk, out_flush, ins, out_context, config); } + else if (event_chunk->type == FLB_INPUT_PROFILES){ + result = process_profiles(event_chunk, out_flush, ins, out_context, config); + } FLB_OUTPUT_RETURN(result); } @@ -788,11 +877,24 @@ static struct flb_config_map config_map[] = { "Specify an optional HTTP URI for the target OTel endpoint." }, { - FLB_CONFIG_MAP_STR, "grpc_traces_uri", "/opentelemetry.proto.collector.trace.v1.TraceService/Export", + FLB_CONFIG_MAP_STR, "grpc_traces_uri", + "/opentelemetry.proto.collector.trace.v1.TraceService/Export", 0, FLB_TRUE, offsetof(struct opentelemetry_context, grpc_traces_uri), "Specify an optional gRPC URI for the target OTel endpoint." }, + { + FLB_CONFIG_MAP_STR, "profiles_uri", "/v1development/profiles", + 0, FLB_TRUE, offsetof(struct opentelemetry_context, profiles_uri), + "Specify an optional HTTP URI for the profiles OTel endpoint." + }, + { + FLB_CONFIG_MAP_STR, "grpc_profiles_uri", + "/opentelemetry.proto.collector.profiles.v1experimental.ProfilesService/Export", + 0, FLB_TRUE, offsetof(struct opentelemetry_context, grpc_profiles_uri), + "Specify an optional gRPC URI for the profiles OTel endpoint." + }, + { FLB_CONFIG_MAP_BOOL, "log_response_payload", "true", 0, FLB_TRUE, offsetof(struct opentelemetry_context, log_response_payload), @@ -886,7 +988,7 @@ struct flb_output_plugin out_opentelemetry_plugin = { .cb_flush = cb_opentelemetry_flush, .cb_exit = cb_opentelemetry_exit, .config_map = config_map, - .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES, + .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES | FLB_OUTPUT_PROFILES, .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, .test_formatter.callback = opentelemetry_format_test, diff --git a/plugins/out_opentelemetry/opentelemetry.h b/plugins/out_opentelemetry/opentelemetry.h index dd1839c34fc..4923bec9cfa 100644 --- a/plugins/out_opentelemetry/opentelemetry.h +++ b/plugins/out_opentelemetry/opentelemetry.h @@ -58,11 +58,14 @@ struct opentelemetry_context { int proxy_port; /* HTTP URI */ + char *profiles_uri_sanitized; char *traces_uri_sanitized; char *metrics_uri_sanitized; char *logs_uri_sanitized; char *traces_uri; char *grpc_traces_uri; + char *profiles_uri; + char *grpc_profiles_uri; char *metrics_uri; char *grpc_metrics_uri; char *logs_uri; diff --git a/plugins/out_opentelemetry/opentelemetry_conf.c b/plugins/out_opentelemetry/opentelemetry_conf.c index 97cb5f1c1af..46773aeaeb0 100644 --- a/plugins/out_opentelemetry/opentelemetry_conf.c +++ b/plugins/out_opentelemetry/opentelemetry_conf.c @@ -332,6 +332,7 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output ctx->logs_uri_sanitized = sanitize_uri(ctx->logs_uri); ctx->traces_uri_sanitized = sanitize_uri(ctx->traces_uri); ctx->metrics_uri_sanitized = sanitize_uri(ctx->metrics_uri); + ctx->profiles_uri_sanitized = sanitize_uri(ctx->profiles_uri); if (ctx->logs_uri_sanitized == NULL) { flb_plg_trace(ctx->ins, @@ -363,6 +364,16 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output return NULL; } + if (ctx->profiles_uri_sanitized == NULL) { + flb_plg_trace(ctx->ins, + "Could not allocate memory for sanitized " + "profiles endpoint uri"); + + flb_opentelemetry_context_destroy(ctx); + + return NULL; + } + /* list of 'logs_body_key' */ ret = log_body_key_list_create(ctx); if (ret != 0) { @@ -604,6 +615,10 @@ void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx) flb_free(ctx->metrics_uri_sanitized); } + if (ctx->profiles_uri_sanitized != NULL && ctx->profiles_uri_sanitized != ctx->profiles_uri) { + flb_free(ctx->profiles_uri_sanitized); + } + /* release log_body_key_list */ log_body_key_list_destroy(ctx);