Skip to content

Commit

Permalink
Add support to set otel fields from the event body based on keys. Sig…
Browse files Browse the repository at this point in the history
…ned-off-by: Cory Boslet <[email protected]>
  • Loading branch information
cb645j committed Mar 27, 2024
1 parent 20c461a commit 4bfcc6b
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 0 deletions.
126 changes: 126 additions & 0 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,110 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx,
return 0;
}

static int append_v1_logs_message(struct opentelemetry_context *ctx,
struct flb_log_event *event,
Opentelemetry__Proto__Logs__V1__LogRecord *log_record)
{
struct flb_ra_value *ra_val;

if (ctx == NULL || event == NULL || log_record == NULL) {
return -1;
}

/* SeverityText */
if (ctx->ra_severity_text_message) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_text_message, *event->body);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_STR) {
if(is_valid_severity_text(ra_val->o.via.str.ptr, ra_val->o.via.str.size) == FLB_TRUE){
log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size+1);
if (log_record->severity_text) {
strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size);
}
flb_ra_key_value_destroy(ra_val);
}else{
flb_plg_warn(ctx->ins, "Unable to process %s. Invalid Severity Text.\n", ctx->ra_severity_text_message->pattern);
log_record->severity_text = NULL;
}
}
else {
/* To prevent invalid free */
log_record->severity_text = NULL;
}
}

/* SeverityNumber */
if (ctx->ra_severity_number_message) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_number_metadata, *event->body);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER &&
is_valid_severity_number(ra_val->o.via.u64) == FLB_TRUE) {
log_record->severity_number = ra_val->o.via.u64;
flb_ra_key_value_destroy(ra_val);
}
}else if(ctx->ra_severity_text_message){
//TODO get sev number based off sev text
}

/* SpanId */
if (ctx->ra_span_id_message) {
ra_val = flb_ra_get_value_object(ctx->ra_span_id_message, *event->body);
if (ra_val != NULL) {
if(ra_val->o.type == MSGPACK_OBJECT_BIN){
log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->span_id.data) {
memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->span_id.len = ra_val->o.via.bin.size;
}
}else if(ra_val->o.type == MSGPACK_OBJECT_STR){
log_record->span_id.data = flb_calloc(8, sizeof(uint8_t));
if (log_record->span_id.data) {
// Convert to a byte array
uint8_t val[8];
for(size_t count = 0; count < sizeof val/sizeof *val; count++ ){
sscanf(ra_val->o.via.str.ptr, "%2hhx", &val[count]);
ra_val->o.via.str.ptr+=2;
}
memcpy(log_record->span_id.data, val, sizeof(val));
log_record->span_id.len = sizeof(val);
}
}else{
flb_plg_warn(ctx->ins, "Unable to process %s. Unsupported data type.\n", ctx->ra_span_id_message->pattern);
}
flb_ra_key_value_destroy(ra_val);
}
}

/* TraceId */
if (ctx->ra_trace_id_message) {
ra_val = flb_ra_get_value_object(ctx->ra_trace_id_message, *event->body);
if (ra_val != NULL) {
if(ra_val->o.type == MSGPACK_OBJECT_BIN){
log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->trace_id.data) {
memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->trace_id.len = ra_val->o.via.bin.size;
}
}else if(ra_val->o.type == MSGPACK_OBJECT_STR){
log_record->trace_id.data = flb_calloc(16, sizeof(uint8_t));
if (log_record->trace_id.data) {
// Convert from hexdec string to a 16 byte array
uint8_t val[16];
for(size_t count = 0; count < sizeof val/sizeof *val; count++ ){
sscanf(ra_val->o.via.str.ptr, "%2hhx", &val[count]);
ra_val->o.via.str.ptr+=2;
}
memcpy(log_record->trace_id.data, val, sizeof(val));
log_record->trace_id.len = sizeof(val);
}
}else{
flb_plg_warn(ctx->ins, "Unable to process %s. Unsupported data type.\n", ctx->ra_trace_id_message->pattern);
}
flb_ra_key_value_destroy(ra_val);
}
}

return 0;
}

static int process_logs(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out_flush,
struct flb_input_instance *ins, void *out_context,
Expand Down Expand Up @@ -1158,6 +1262,8 @@ static int process_logs(struct flb_event_chunk *event_chunk,

append_v1_logs_metadata(ctx, &event, &log_records[log_record_count]);

append_v1_logs_message(ctx, &event, &log_records[log_record_count]);

ret = FLB_OK;

log_records[log_record_count].time_unix_nano = flb_time_to_nanosec(&event.timestamp);
Expand Down Expand Up @@ -1544,6 +1650,26 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_resource_metadata_key),
"Specify a Resource key"
},
{
FLB_CONFIG_MAP_STR, "logs_span_id_message_key", "$SpanId",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_span_id_message_key),
"Specify a SpanId key"
},
{
FLB_CONFIG_MAP_STR, "logs_trace_id_message_key", "$TraceId",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_trace_id_message_key),
"Specify a TraceId key"
},
{
FLB_CONFIG_MAP_STR, "logs_severity_text_message_key", "$SeverityText",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_severity_text_message_key),
"Specify a Severity Text key"
},
{
FLB_CONFIG_MAP_STR, "logs_severity_number_message_key", "$SeverityNumber",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_severity_number_message_key),
"Specify a Severity Number key"
},

/* EOF */
{0}
Expand Down
13 changes: 13 additions & 0 deletions plugins/out_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ struct opentelemetry_context {
flb_sds_t logs_instrumentation_scope_metadata_key;
flb_sds_t logs_resource_metadata_key;

/* otel body keys */
flb_sds_t logs_span_id_message_key;
struct flb_record_accessor *ra_span_id_message;

flb_sds_t logs_trace_id_message_key;
struct flb_record_accessor *ra_trace_id_message;

flb_sds_t logs_severity_text_message_key;
struct flb_record_accessor *ra_severity_text_message;

flb_sds_t logs_severity_number_message_key;
struct flb_record_accessor *ra_severity_number_message;

/* Number of logs to flush at a time */
int batch_size;

Expand Down
32 changes: 32 additions & 0 deletions plugins/out_opentelemetry/opentelemetry_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,26 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output
if (ctx->ra_attributes_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for attributes");
}
ctx->ra_span_id_message = flb_ra_create((char*)ctx->logs_span_id_message_key,
FLB_FALSE);
if (ctx->ra_span_id_message == NULL) {
flb_plg_error(ins, "failed to create ra for message span id");
}
ctx->ra_trace_id_message = flb_ra_create((char*)ctx->logs_trace_id_message_key,
FLB_FALSE);
if (ctx->ra_trace_id_message == NULL) {
flb_plg_error(ins, "failed to create ra for message trace id");
}
ctx->ra_severity_text_message = flb_ra_create((char*)ctx->logs_severity_text_message_key,
FLB_FALSE);
if (ctx->ra_severity_text_message == NULL) {
flb_plg_error(ins, "failed to create ra for message severity text");
}
ctx->ra_severity_number_message = flb_ra_create((char*)ctx->logs_severity_number_message_key,
FLB_FALSE);
if (ctx->ra_severity_number_message == NULL) {
flb_plg_error(ins, "failed to create ra for message severity number");
}

return ctx;
}
Expand Down Expand Up @@ -466,6 +486,18 @@ void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx)
if (ctx->ra_attributes_metadata) {
flb_ra_destroy(ctx->ra_attributes_metadata);
}
if (ctx->ra_span_id_message) {
flb_ra_destroy(ctx->ra_span_id_message);
}
if (ctx->ra_trace_id_message) {
flb_ra_destroy(ctx->ra_trace_id_message);
}
if (ctx->ra_severity_text_message) {
flb_ra_destroy(ctx->ra_severity_text_message);
}
if (ctx->ra_severity_number_message) {
flb_ra_destroy(ctx->ra_severity_number_message);
}

flb_free(ctx->proxy_host);
flb_free(ctx);
Expand Down

0 comments on commit 4bfcc6b

Please sign in to comment.