Skip to content

Commit

Permalink
out_opentelemetry: profiles support added
Browse files Browse the repository at this point in the history
Signed-off-by: Leonardo Alminana <[email protected]>
  • Loading branch information
leonardo-albertovich committed Dec 19, 2024
1 parent b1f120f commit 01f82a2
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 2 deletions.
106 changes: 104 additions & 2 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
#include <ctraces/ctraces.h>
#include <ctraces/ctr_decode_msgpack.h>

#include <cprofiles/cprofiles.h>
#include <cprofiles/cprof_decode_msgpack.h>
#include <cprofiles/cprof_encode_opentelemetry.h>

extern cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt);
extern void cmt_encode_opentelemetry_destroy(cfl_sds_t text);

Expand Down Expand Up @@ -643,6 +647,88 @@ static int process_traces(struct flb_event_chunk *event_chunk,
return result;
}

static int process_profiles(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out_flush,
struct flb_input_instance *ins, void *out_context,
struct flb_config *config)
{
int ret;
int result;
cfl_sds_t encoded_chunk;
flb_sds_t buf = NULL;
size_t off = 0;
struct cprof *profiles_context;
struct opentelemetry_context *ctx = out_context;

/* Initialize vars */
ctx = out_context;
result = FLB_OK;

buf = flb_sds_create_size(event_chunk->size);
if (!buf) {
flb_plg_error(ctx->ins, "could not allocate outgoing buffer");
return FLB_RETRY;
}

flb_plg_debug(ctx->ins, "cprofiles msgpack size: %lu",
event_chunk->size);

while (cprof_decode_msgpack_create(&profiles_context,
(unsigned char *) event_chunk->data,
event_chunk->size, &off) == 0) {
/* Create a OpenTelemetry payload */
ret = cprof_encode_opentelemetry_create(&encoded_chunk, profiles_context);
if (ret != CPROF_ENCODE_OPENTELEMETRY_SUCCESS) {
flb_plg_error(ctx->ins,
"Error encoding context as opentelemetry");
result = FLB_ERROR;
cprof_decode_msgpack_destroy(profiles_context);
goto exit;
}

/* concat buffer */
ret = flb_sds_cat_safe(&buf, encoded_chunk, flb_sds_len(encoded_chunk));
if (ret != 0) {
flb_plg_error(ctx->ins, "Error appending encoded profiles to buffer");
result = FLB_ERROR;
cprof_encode_opentelemetry_destroy(encoded_chunk);
cprof_decode_msgpack_destroy(profiles_context);
goto exit;
}

/* release */
cprof_encode_opentelemetry_destroy(encoded_chunk);
cprof_decode_msgpack_destroy(profiles_context);
}

flb_plg_debug(ctx->ins, "final payload size: %lu", flb_sds_len(buf));
if (buf && flb_sds_len(buf) > 0) {
/* Send HTTP request */
result = opentelemetry_post(ctx, buf, flb_sds_len(buf),
event_chunk->tag,
flb_sds_len(event_chunk->tag),
ctx->profiles_uri_sanitized,
ctx->grpc_profiles_uri);

/* Debug http_post() result statuses */
if (result == FLB_OK) {
flb_plg_debug(ctx->ins, "http_post result FLB_OK");
}
else if (result == FLB_ERROR) {
flb_plg_debug(ctx->ins, "http_post result FLB_ERROR");
}
else if (result == FLB_RETRY) {
flb_plg_debug(ctx->ins, "http_post result FLB_RETRY");
}
}

exit:
if (buf) {
flb_sds_destroy(buf);
}
return result;
}

static int cb_opentelemetry_exit(void *data, struct flb_config *config)
{
struct opentelemetry_context *ctx;
Expand Down Expand Up @@ -690,6 +776,9 @@ static void cb_opentelemetry_flush(struct flb_event_chunk *event_chunk,
else if (event_chunk->type == FLB_INPUT_TRACES){
result = process_traces(event_chunk, out_flush, ins, out_context, config);
}
else if (event_chunk->type == FLB_INPUT_PROFILES){
result = process_profiles(event_chunk, out_flush, ins, out_context, config);
}

FLB_OUTPUT_RETURN(result);
}
Expand Down Expand Up @@ -788,11 +877,24 @@ static struct flb_config_map config_map[] = {
"Specify an optional HTTP URI for the target OTel endpoint."
},
{
FLB_CONFIG_MAP_STR, "grpc_traces_uri", "/opentelemetry.proto.collector.trace.v1.TraceService/Export",
FLB_CONFIG_MAP_STR, "grpc_traces_uri",
"/opentelemetry.proto.collector.trace.v1.TraceService/Export",
0, FLB_TRUE, offsetof(struct opentelemetry_context, grpc_traces_uri),
"Specify an optional gRPC URI for the target OTel endpoint."
},

{
FLB_CONFIG_MAP_STR, "profiles_uri", "/v1development/profiles",
0, FLB_TRUE, offsetof(struct opentelemetry_context, profiles_uri),
"Specify an optional HTTP URI for the profiles OTel endpoint."
},
{
FLB_CONFIG_MAP_STR, "grpc_profiles_uri",
"/opentelemetry.proto.collector.profiles.v1experimental.ProfilesService/Export",
0, FLB_TRUE, offsetof(struct opentelemetry_context, grpc_profiles_uri),
"Specify an optional gRPC URI for the profiles OTel endpoint."
},

{
FLB_CONFIG_MAP_BOOL, "log_response_payload", "true",
0, FLB_TRUE, offsetof(struct opentelemetry_context, log_response_payload),
Expand Down Expand Up @@ -886,7 +988,7 @@ struct flb_output_plugin out_opentelemetry_plugin = {
.cb_flush = cb_opentelemetry_flush,
.cb_exit = cb_opentelemetry_exit,
.config_map = config_map,
.event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES,
.event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES | FLB_OUTPUT_PROFILES,
.flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,

.test_formatter.callback = opentelemetry_format_test,
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@ struct opentelemetry_context {
int proxy_port;

/* HTTP URI */
char *profiles_uri_sanitized;
char *traces_uri_sanitized;
char *metrics_uri_sanitized;
char *logs_uri_sanitized;
char *traces_uri;
char *grpc_traces_uri;
char *profiles_uri;
char *grpc_profiles_uri;
char *metrics_uri;
char *grpc_metrics_uri;
char *logs_uri;
Expand Down
15 changes: 15 additions & 0 deletions plugins/out_opentelemetry/opentelemetry_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output
ctx->logs_uri_sanitized = sanitize_uri(ctx->logs_uri);
ctx->traces_uri_sanitized = sanitize_uri(ctx->traces_uri);
ctx->metrics_uri_sanitized = sanitize_uri(ctx->metrics_uri);
ctx->profiles_uri_sanitized = sanitize_uri(ctx->profiles_uri);

if (ctx->logs_uri_sanitized == NULL) {
flb_plg_trace(ctx->ins,
Expand Down Expand Up @@ -363,6 +364,16 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output
return NULL;
}

if (ctx->profiles_uri_sanitized == NULL) {
flb_plg_trace(ctx->ins,
"Could not allocate memory for sanitized "
"profiles endpoint uri");

flb_opentelemetry_context_destroy(ctx);

return NULL;
}

/* list of 'logs_body_key' */
ret = log_body_key_list_create(ctx);
if (ret != 0) {
Expand Down Expand Up @@ -604,6 +615,10 @@ void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx)
flb_free(ctx->metrics_uri_sanitized);
}

if (ctx->profiles_uri_sanitized != NULL && ctx->profiles_uri_sanitized != ctx->profiles_uri) {
flb_free(ctx->profiles_uri_sanitized);
}

/* release log_body_key_list */
log_body_key_list_destroy(ctx);

Expand Down

0 comments on commit 01f82a2

Please sign in to comment.