From 4bfcc6be724b61e09d03eb281b3a64b888b21b2f Mon Sep 17 00:00:00 2001 From: "Boslet, Cory (cb645j)" Date: Wed, 27 Mar 2024 13:48:11 -0400 Subject: [PATCH] Add support to set otel fields from the event body based on keys. Signed-off-by: Cory Boslet --- plugins/out_opentelemetry/opentelemetry.c | 126 ++++++++++++++++++ plugins/out_opentelemetry/opentelemetry.h | 13 ++ .../out_opentelemetry/opentelemetry_conf.c | 32 +++++ 3 files changed, 171 insertions(+) diff --git a/plugins/out_opentelemetry/opentelemetry.c b/plugins/out_opentelemetry/opentelemetry.c index 55feb082b79..a44f3d82658 100644 --- a/plugins/out_opentelemetry/opentelemetry.c +++ b/plugins/out_opentelemetry/opentelemetry.c @@ -1080,6 +1080,110 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx, return 0; } +static int append_v1_logs_message(struct opentelemetry_context *ctx, + struct flb_log_event *event, + Opentelemetry__Proto__Logs__V1__LogRecord *log_record) +{ + struct flb_ra_value *ra_val; + + if (ctx == NULL || event == NULL || log_record == NULL) { + return -1; + } + + /* SeverityText */ + if (ctx->ra_severity_text_message) { + ra_val = flb_ra_get_value_object(ctx->ra_severity_text_message, *event->body); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_STR) { + if(is_valid_severity_text(ra_val->o.via.str.ptr, ra_val->o.via.str.size) == FLB_TRUE){ + log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size+1); + if (log_record->severity_text) { + strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size); + } + flb_ra_key_value_destroy(ra_val); + }else{ + flb_plg_warn(ctx->ins, "Unable to process %s. Invalid Severity Text.\n", ctx->ra_severity_text_message->pattern); + log_record->severity_text = NULL; + } + } + else { + /* To prevent invalid free */ + log_record->severity_text = NULL; + } + } + + /* SeverityNumber */ + if (ctx->ra_severity_number_message) { + ra_val = flb_ra_get_value_object(ctx->ra_severity_number_metadata, *event->body); + if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER && + is_valid_severity_number(ra_val->o.via.u64) == FLB_TRUE) { + log_record->severity_number = ra_val->o.via.u64; + flb_ra_key_value_destroy(ra_val); + } + }else if(ctx->ra_severity_text_message){ + //TODO get sev number based off sev text + } + + /* SpanId */ + if (ctx->ra_span_id_message) { + ra_val = flb_ra_get_value_object(ctx->ra_span_id_message, *event->body); + if (ra_val != NULL) { + if(ra_val->o.type == MSGPACK_OBJECT_BIN){ + log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size); + if (log_record->span_id.data) { + memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size); + log_record->span_id.len = ra_val->o.via.bin.size; + } + }else if(ra_val->o.type == MSGPACK_OBJECT_STR){ + log_record->span_id.data = flb_calloc(8, sizeof(uint8_t)); + if (log_record->span_id.data) { + // Convert to a byte array + uint8_t val[8]; + for(size_t count = 0; count < sizeof val/sizeof *val; count++ ){ + sscanf(ra_val->o.via.str.ptr, "%2hhx", &val[count]); + ra_val->o.via.str.ptr+=2; + } + memcpy(log_record->span_id.data, val, sizeof(val)); + log_record->span_id.len = sizeof(val); + } + }else{ + flb_plg_warn(ctx->ins, "Unable to process %s. Unsupported data type.\n", ctx->ra_span_id_message->pattern); + } + flb_ra_key_value_destroy(ra_val); + } + } + + /* TraceId */ + if (ctx->ra_trace_id_message) { + ra_val = flb_ra_get_value_object(ctx->ra_trace_id_message, *event->body); + if (ra_val != NULL) { + if(ra_val->o.type == MSGPACK_OBJECT_BIN){ + log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size); + if (log_record->trace_id.data) { + memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size); + log_record->trace_id.len = ra_val->o.via.bin.size; + } + }else if(ra_val->o.type == MSGPACK_OBJECT_STR){ + log_record->trace_id.data = flb_calloc(16, sizeof(uint8_t)); + if (log_record->trace_id.data) { + // Convert from hexdec string to a 16 byte array + uint8_t val[16]; + for(size_t count = 0; count < sizeof val/sizeof *val; count++ ){ + sscanf(ra_val->o.via.str.ptr, "%2hhx", &val[count]); + ra_val->o.via.str.ptr+=2; + } + memcpy(log_record->trace_id.data, val, sizeof(val)); + log_record->trace_id.len = sizeof(val); + } + }else{ + flb_plg_warn(ctx->ins, "Unable to process %s. Unsupported data type.\n", ctx->ra_trace_id_message->pattern); + } + flb_ra_key_value_destroy(ra_val); + } + } + + return 0; +} + static int process_logs(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, struct flb_input_instance *ins, void *out_context, @@ -1158,6 +1262,8 @@ static int process_logs(struct flb_event_chunk *event_chunk, append_v1_logs_metadata(ctx, &event, &log_records[log_record_count]); + append_v1_logs_message(ctx, &event, &log_records[log_record_count]); + ret = FLB_OK; log_records[log_record_count].time_unix_nano = flb_time_to_nanosec(&event.timestamp); @@ -1544,6 +1650,26 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_resource_metadata_key), "Specify a Resource key" }, + { + FLB_CONFIG_MAP_STR, "logs_span_id_message_key", "$SpanId", + 0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_span_id_message_key), + "Specify a SpanId key" + }, + { + FLB_CONFIG_MAP_STR, "logs_trace_id_message_key", "$TraceId", + 0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_trace_id_message_key), + "Specify a TraceId key" + }, + { + FLB_CONFIG_MAP_STR, "logs_severity_text_message_key", "$SeverityText", + 0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_severity_text_message_key), + "Specify a Severity Text key" + }, + { + FLB_CONFIG_MAP_STR, "logs_severity_number_message_key", "$SeverityNumber", + 0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_severity_number_message_key), + "Specify a Severity Number key" + }, /* EOF */ {0} diff --git a/plugins/out_opentelemetry/opentelemetry.h b/plugins/out_opentelemetry/opentelemetry.h index d3b62e0d819..d56901376e6 100644 --- a/plugins/out_opentelemetry/opentelemetry.h +++ b/plugins/out_opentelemetry/opentelemetry.h @@ -87,6 +87,19 @@ struct opentelemetry_context { flb_sds_t logs_instrumentation_scope_metadata_key; flb_sds_t logs_resource_metadata_key; + /* otel body keys */ + flb_sds_t logs_span_id_message_key; + struct flb_record_accessor *ra_span_id_message; + + flb_sds_t logs_trace_id_message_key; + struct flb_record_accessor *ra_trace_id_message; + + flb_sds_t logs_severity_text_message_key; + struct flb_record_accessor *ra_severity_text_message; + + flb_sds_t logs_severity_number_message_key; + struct flb_record_accessor *ra_severity_number_message; + /* Number of logs to flush at a time */ int batch_size; diff --git a/plugins/out_opentelemetry/opentelemetry_conf.c b/plugins/out_opentelemetry/opentelemetry_conf.c index 13680178c28..4e6a68fe153 100644 --- a/plugins/out_opentelemetry/opentelemetry_conf.c +++ b/plugins/out_opentelemetry/opentelemetry_conf.c @@ -420,6 +420,26 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output if (ctx->ra_attributes_metadata == NULL) { flb_plg_error(ins, "failed to create ra for attributes"); } + ctx->ra_span_id_message = flb_ra_create((char*)ctx->logs_span_id_message_key, + FLB_FALSE); + if (ctx->ra_span_id_message == NULL) { + flb_plg_error(ins, "failed to create ra for message span id"); + } + ctx->ra_trace_id_message = flb_ra_create((char*)ctx->logs_trace_id_message_key, + FLB_FALSE); + if (ctx->ra_trace_id_message == NULL) { + flb_plg_error(ins, "failed to create ra for message trace id"); + } + ctx->ra_severity_text_message = flb_ra_create((char*)ctx->logs_severity_text_message_key, + FLB_FALSE); + if (ctx->ra_severity_text_message == NULL) { + flb_plg_error(ins, "failed to create ra for message severity text"); + } + ctx->ra_severity_number_message = flb_ra_create((char*)ctx->logs_severity_number_message_key, + FLB_FALSE); + if (ctx->ra_severity_number_message == NULL) { + flb_plg_error(ins, "failed to create ra for message severity number"); + } return ctx; } @@ -466,6 +486,18 @@ void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx) if (ctx->ra_attributes_metadata) { flb_ra_destroy(ctx->ra_attributes_metadata); } + if (ctx->ra_span_id_message) { + flb_ra_destroy(ctx->ra_span_id_message); + } + if (ctx->ra_trace_id_message) { + flb_ra_destroy(ctx->ra_trace_id_message); + } + if (ctx->ra_severity_text_message) { + flb_ra_destroy(ctx->ra_severity_text_message); + } + if (ctx->ra_severity_number_message) { + flb_ra_destroy(ctx->ra_severity_number_message); + } flb_free(ctx->proxy_host); flb_free(ctx);