Skip to content

Commit

Permalink
in_opentelemetry: logs: add support for metadata in JSON payload
Browse files Browse the repository at this point in the history
This change adds support to parse and register the resources and scope metadata
coming from Logs which are ingested from a JSON payload.

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Dec 14, 2024
1 parent 7d3b7cb commit e6e1213
Showing 1 changed file with 203 additions and 29 deletions.
232 changes: 203 additions & 29 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
msgpack_pack_uint64(&mp_pck, resource->dropped_attributes_count);
}

flb_mp_map_header_end(&mh_tmp);


if (resource_log->schema_url) {
flb_mp_map_header_append(&mh);
Expand Down Expand Up @@ -781,13 +781,23 @@ static int find_map_entry_by_key(msgpack_object_map *map,
size_t match_index,
int case_insensitive)
{
size_t match_count;
int result;
int index;
int key_len;
size_t match_count;

if (!key) {
return -1;
}

key_len = strlen(key);
match_count = 0;

for (index = 0 ; index < (int) map->size ; index++) {
if (key_len != map->ptr[index].key.via.str.size) {
continue;
}

if (map->ptr[index].key.type == MSGPACK_OBJECT_STR) {
if (case_insensitive) {
result = strncasecmp(map->ptr[index].key.via.str.ptr,
Expand Down Expand Up @@ -1229,6 +1239,8 @@ static int process_json_payload_log_records_entry(
int body_type;
struct flb_time timestamp;
int result;
msgpack_object *severity_number = NULL;
msgpack_object *severity_text = NULL;

if (log_records_object->type != MSGPACK_OBJECT_MAP) {
flb_plg_error(ctx->ins, "unexpected logRecords entry type");
Expand Down Expand Up @@ -1288,12 +1300,28 @@ static int process_json_payload_log_records_entry(
flb_time_from_uint64(&timestamp, timestamp_uint64);
}

/* severityNumber */
result = find_map_entry_by_key(log_records_entry, "severityNumber", 0, FLB_TRUE);
if (result == -1) {
result = find_map_entry_by_key(log_records_entry, "severity_number", 0, FLB_TRUE);
}
if (result >= 0) {
severity_number = &log_records_entry->ptr[result].val;
}

/* severityText */
result = find_map_entry_by_key(log_records_entry, "severityText", 0, FLB_TRUE);
if (result == -1) {
result = find_map_entry_by_key(log_records_entry, "severity_text", 0, FLB_TRUE);
}
if (result >= 0) {
severity_text = &log_records_entry->ptr[result].val;
}


result = find_map_entry_by_key(log_records_entry, "attributes", 0, FLB_TRUE);

if (result == -1) {
flb_plg_debug(ctx->ins, "attributes missing");

metadata_object = NULL;
}
else {
Expand Down Expand Up @@ -1329,15 +1357,33 @@ static int process_json_payload_log_records_entry(
result = flb_log_event_encoder_set_timestamp(encoder, &timestamp);
}

if (result == FLB_EVENT_ENCODER_SUCCESS &&
metadata_object != NULL) {
flb_log_event_encoder_dynamic_field_reset(&encoder->metadata);
flb_log_event_encoder_dynamic_field_reset(&encoder->metadata);
result = flb_log_event_encoder_begin_map(encoder, FLB_LOG_EVENT_METADATA);
if (result == FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_append_string(encoder, FLB_LOG_EVENT_METADATA, ctx->logs_metadata_key, flb_sds_len(ctx->logs_metadata_key));
flb_log_event_encoder_begin_map(encoder, FLB_LOG_EVENT_METADATA);

if (severity_number != NULL) {
flb_log_event_encoder_append_metadata_values(encoder,
FLB_LOG_EVENT_STRING_VALUE("severity_number", 15),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(severity_number));
}

if (severity_text != NULL && severity_text->type == MSGPACK_OBJECT_STR) {
flb_log_event_encoder_append_metadata_values(encoder,
FLB_LOG_EVENT_STRING_VALUE("severity_text", 13),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(severity_text));
}

if (metadata_object != NULL) {
flb_log_event_encoder_append_string(encoder, FLB_LOG_EVENT_METADATA, "attributes", 10);
result = json_payload_append_converted_kvlist(encoder, FLB_LOG_EVENT_METADATA, metadata_object);
}

flb_log_event_encoder_commit_map(encoder, FLB_LOG_EVENT_METADATA);

result = json_payload_append_converted_kvlist(
encoder,
FLB_LOG_EVENT_METADATA,
metadata_object);
}
flb_log_event_encoder_commit_map(encoder, FLB_LOG_EVENT_METADATA);

if (result == FLB_EVENT_ENCODER_SUCCESS &&
body_object != NULL) {
Expand Down Expand Up @@ -1400,7 +1446,6 @@ static int process_json_payload_scope_logs_entry(

if (result == -1) {
flb_plg_error(ctx->ins, "scopeLogs missing");

return -3;
}
}
Expand All @@ -1425,31 +1470,48 @@ static int process_json_payload_scope_logs_entry(
return result;
}


static int process_json_payload_resource_logs_entry(
struct flb_opentelemetry *ctx,
struct flb_log_event_encoder *encoder,
msgpack_object *resource_logs_object)
struct flb_opentelemetry *ctx,
struct flb_log_event_encoder *encoder,
size_t resource_logs_index,
msgpack_object *resource_logs_object)
{
msgpack_object_map *resource_logs_entry;
int ret;
int result;
size_t index;
msgpack_object obj;
msgpack_object_map *resource = NULL;
msgpack_object *resource_attr = NULL;
msgpack_object_map *resource_logs_entry = NULL;
msgpack_object *scope = NULL;
msgpack_object_array *scope_logs;
int result;
size_t index;


if (resource_logs_object->type != MSGPACK_OBJECT_MAP) {
flb_plg_error(ctx->ins, "unexpected resourceLogs entry type");

return -2;
}

resource_logs_entry = &resource_logs_object->via.map;
/* get 'resource' and resource['attributes'] */
result = find_map_entry_by_key(&resource_logs_object->via.map, "resource", 0, FLB_TRUE);
if (result >= 0) {
obj = resource_logs_object->via.map.ptr[result].val;
if (obj.type == MSGPACK_OBJECT_MAP) {
resource = &obj.via.map;
result = find_map_entry_by_key(resource, "attributes", 0, FLB_TRUE);
if (result >= 0) {
obj = resource->ptr[result].val;
if (obj.type == MSGPACK_OBJECT_ARRAY) {
resource_attr = &obj;
}
}
}
}

resource_logs_entry = &resource_logs_object->via.map;
result = find_map_entry_by_key(resource_logs_entry, "scopeLogs", 0, FLB_TRUE);

if (result == -1) {
result = find_map_entry_by_key(resource_logs_entry, "scope_logs", 0, FLB_TRUE);

if (result == -1) {
flb_plg_error(ctx->ins, "scopeLogs missing");

Expand All @@ -1459,19 +1521,130 @@ static int process_json_payload_resource_logs_entry(

if (resource_logs_entry->ptr[result].val.type != MSGPACK_OBJECT_ARRAY) {
flb_plg_error(ctx->ins, "unexpected scopeLogs type");

return -2;
}

scope_logs = &resource_logs_entry->ptr[result].val.via.array;

result = 0;

for (index = 0 ; index < scope_logs->size ; index++) {
/*
* Add the information about OTLP metadata, we do this by registering
* a group-type record.
*/
flb_log_event_encoder_group_init(encoder);

/* pack internal schema */
ret = flb_log_event_encoder_append_metadata_values(encoder,
FLB_LOG_EVENT_STRING_VALUE("schema", 6),
FLB_LOG_EVENT_STRING_VALUE("otlp", 4),
FLB_LOG_EVENT_STRING_VALUE("resource_id", 11),
FLB_LOG_EVENT_INT64_VALUE(resource_logs_index),
FLB_LOG_EVENT_STRING_VALUE("scope_id", 8),
FLB_LOG_EVENT_INT64_VALUE(index));
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(ctx->ins, "could not set group content metadata");
return -2;
}

/* Resource key */
flb_log_event_encoder_append_body_string(encoder, "resource", 8);

/* start resource value (map) */
flb_log_event_encoder_body_begin_map(encoder);

/* Check if we have OTel resource attributes */
if (resource_attr) {
flb_log_event_encoder_append_body_string(encoder, "attributes", 10);
result = json_payload_append_converted_kvlist(encoder,
FLB_LOG_EVENT_BODY,
resource_attr);
}

/* resource dropped_attributers_count */
result = find_map_entry_by_key(resource, "droppedAttributesCount", 0, FLB_TRUE);
if (result >= 0) {
obj = resource->ptr[result].val;
flb_log_event_encoder_append_body_values(encoder,
FLB_LOG_EVENT_CSTRING_VALUE("dropped_attributes_count"),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&obj));
}

/* close resource map */
flb_log_event_encoder_body_commit_map(encoder);

/* scope metadata */
scope = NULL;
obj = scope_logs->ptr[index];
if (obj.type == MSGPACK_OBJECT_MAP) {
result = find_map_entry_by_key(&obj.via.map, "scope", 0, FLB_TRUE);
if (result >= 0) {
if (obj.via.map.ptr[result].val.type == MSGPACK_OBJECT_MAP) {
scope = &obj.via.map.ptr[result].val;
}
}
}

if (scope) {
/*
* if the scope is found, process every expected key one by one to avoid
* wrongly ingested items.
*/

/* append scope key */
flb_log_event_encoder_append_body_string(encoder, "scope", 5);

/* scope map value */
flb_log_event_encoder_body_begin_map(encoder);

/* scope name */
result = find_map_entry_by_key(&scope->via.map, "name", 0, FLB_TRUE);
if (result >= 0) {
obj = scope->via.map.ptr[result].val;
if (obj.type == MSGPACK_OBJECT_STR) {
flb_log_event_encoder_append_body_values(encoder,
FLB_LOG_EVENT_CSTRING_VALUE("name"),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&obj));
}
}

/* scope version */
result = find_map_entry_by_key(&scope->via.map, "version", 0, FLB_TRUE);
if (result >= 0) {
obj = scope->via.map.ptr[result].val;
if (obj.type == MSGPACK_OBJECT_STR) {
flb_log_event_encoder_append_body_values(encoder,
FLB_LOG_EVENT_CSTRING_VALUE("version"),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&obj));
}
}

/* scope attributes */
result = find_map_entry_by_key(&scope->via.map, "attributes", 0, FLB_TRUE);
if (result >= 0) {
obj = scope->via.map.ptr[result].val;
if (obj.type == MSGPACK_OBJECT_ARRAY) {
flb_log_event_encoder_append_body_string(encoder, "attributes", 10);
result = json_payload_append_converted_kvlist(encoder,
FLB_LOG_EVENT_BODY,
&obj);
if (result != 0) {
return -2;
}
}
}

flb_log_event_encoder_commit_map(encoder, FLB_LOG_EVENT_BODY);
}

flb_log_event_encoder_commit_map(encoder, FLB_LOG_EVENT_BODY);

flb_log_event_encoder_group_header_end(encoder);

result = process_json_payload_scope_logs_entry(
ctx,
encoder,
&scope_logs->ptr[index]);
ctx,
encoder,
&scope_logs->ptr[index]);
flb_log_event_encoder_group_end(encoder);
}

return result;
Expand Down Expand Up @@ -1520,6 +1693,7 @@ static int process_json_payload_root(struct flb_opentelemetry *ctx,
result = process_json_payload_resource_logs_entry(
ctx,
encoder,
index,
&resource_logs->ptr[index]);
}

Expand Down

0 comments on commit e6e1213

Please sign in to comment.