diff --git a/plugins/processor_opentelemetry_envelope/otel_envelope.c b/plugins/processor_opentelemetry_envelope/otel_envelope.c index c83fbb8f328..82c22084b0c 100644 --- a/plugins/processor_opentelemetry_envelope/otel_envelope.c +++ b/plugins/processor_opentelemetry_envelope/otel_envelope.c @@ -206,6 +206,112 @@ static int cb_process_logs(struct flb_processor_instance *ins, return FLB_PROCESSOR_SUCCESS; } +static int metrics_add_kvlist(struct cfl_kvlist *kvlist, char *kv1, char *sub_kv1, char *sub_kv2) +{ + int ret; + struct cfl_variant *var; + struct cfl_kvlist *tmp_kvlist; + + var = cfl_kvlist_fetch(kvlist, kv1); + if (!var) { + tmp_kvlist = cfl_kvlist_create(); + if (!tmp_kvlist) { + return -1; + } + ret = cfl_kvlist_insert_kvlist(kvlist, kv1, tmp_kvlist); + if (ret != 0) { + return -1; + } + + /* retrieve the last kv inserted */ + var = cfl_kvlist_fetch(kvlist, kv1); + } + else if (var->type != CFL_VARIANT_KVLIST) { + return -1; + } + + if (!var) { + return -1; + } + + if (sub_kv1) { + ret = metrics_add_kvlist(var->data.as_kvlist, sub_kv1, NULL, NULL); + if (ret != 0) { + return -1; + } + } + + if (sub_kv2) { + ret = metrics_add_kvlist(var->data.as_kvlist, sub_kv2, NULL, NULL); + if (ret != 0) { + return -1; + } + } + + return 0; +} + +static int cb_process_metrics(struct flb_processor_instance *processor_instance, + struct cmt *cmt, + struct cmt **out_context, + const char *tag, int tag_len) +{ + (void) out_context; + (void) tag; + (void) tag_len; + int ret; + struct cfl_variant *var = NULL; + + /* Check internal metadata, look for some producer, if no one is set, add it */ + if (!cmt->internal_metadata) { + cmt->internal_metadata = cfl_kvlist_create(); + if (!cmt->internal_metadata) { + return FLB_PROCESSOR_FAILURE; + } + } + else { + var = cfl_kvlist_fetch(cmt->internal_metadata, "producer"); + } + if (!var) { + cfl_kvlist_insert_string(cmt->internal_metadata, "producer", "fluent-bit"); + } + + /* externl metadata */ + if (!cmt->external_metadata) { + cmt->external_metadata = cfl_kvlist_create(); + if (!cmt->external_metadata) { + return FLB_PROCESSOR_FAILURE; + } + } + + /* scope */ + ret = metrics_add_kvlist(cmt->external_metadata, "scope", "metadata", "attributes"); + if (ret != 0) { + return FLB_PROCESSOR_FAILURE; + } + + /* scope_metrics */ + ret = metrics_add_kvlist(cmt->external_metadata, "scope_metrics", "metadata", NULL); + if (ret != 0) { + return FLB_PROCESSOR_FAILURE; + } + + /* resource */ + ret = metrics_add_kvlist(cmt->external_metadata, "resource", "metadata", "attributes"); + if (ret != 0) { + return FLB_PROCESSOR_FAILURE; + } + + /* resource_metrics */ + ret = metrics_add_kvlist(cmt->external_metadata, "resource_metrics", "metadata", NULL); + if (ret != 0) { + return FLB_PROCESSOR_FAILURE; + } + + *out_context = NULL; + return FLB_PROCESSOR_SUCCESS; +} + static struct flb_config_map config_map[] = { /* EOF */ {0} @@ -216,7 +322,7 @@ struct flb_processor_plugin processor_opentelemetry_envelope_plugin = { .description = "Package log records inside an OpenTelemetry Logs schema", .cb_init = cb_init, .cb_process_logs = cb_process_logs, - .cb_process_metrics = NULL, + .cb_process_metrics = cb_process_metrics, .cb_process_traces = NULL, .cb_exit = cb_exit, .config_map = config_map,