diff --git a/plugins/out_azure/azure.c b/plugins/out_azure/azure.c index 905e23f90f9..d4322fb6591 100644 --- a/plugins/out_azure/azure.c +++ b/plugins/out_azure/azure.c @@ -26,6 +26,8 @@ #include #include #include +#include +#include #include #include "azure.h" @@ -47,6 +49,7 @@ static int cb_azure_init(struct flb_output_instance *ins, } static int azure_format(const void *in_buf, size_t in_bytes, + flb_sds_t tag, flb_sds_t *tag_val_out, char **out_buf, size_t *out_size, struct flb_azure *ctx) { @@ -68,6 +71,7 @@ static int azure_format(const void *in_buf, size_t in_bytes, int len; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; + flb_sds_t tmp = NULL; int ret; /* Count number of items */ @@ -97,6 +101,23 @@ static int azure_format(const void *in_buf, size_t in_bytes, map = *log_event.body; map_size = map.via.map.size; + if (ctx->log_type_key) { + tmp = flb_ra_translate(ctx->ra_prefix_key, + tag, flb_sds_len(tag), + map, NULL); + if (!tmp) { + flb_plg_error(ctx->ins, "Tagged record translation failed!"); + } + else if (flb_sds_is_empty(tmp)) { + flb_plg_warn(ctx->ins, "Record accessor key not matched"); + flb_sds_destroy(tmp); + } + else { + /* tag_val_out must be destroyed by the caller */ + *tag_val_out = tmp; + } + } + msgpack_pack_map(&mp_pck, map_size + 1); /* Append the time key */ @@ -160,6 +181,7 @@ static int azure_format(const void *in_buf, size_t in_bytes, } static int build_headers(struct flb_http_client *c, + flb_sds_t log_type, size_t content_length, struct flb_azure *ctx) { @@ -238,7 +260,7 @@ static int build_headers(struct flb_http_client *c, /* Append headers */ flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); flb_http_add_header(c, "Log-Type", 8, - ctx->log_type, flb_sds_len(ctx->log_type)); + log_type, flb_sds_len(log_type)); flb_http_add_header(c, "Content-Type", 12, "application/json", 16); flb_http_add_header(c, "x-ms-date", 9, rfc1123date, flb_sds_len(rfc1123date)); @@ -283,6 +305,7 @@ static void cb_azure_flush(struct flb_event_chunk *event_chunk, struct flb_connection *u_conn; struct flb_http_client *c; flb_sds_t payload; + flb_sds_t final_log_type = NULL; (void) i_ins; (void) config; @@ -294,7 +317,12 @@ static void cb_azure_flush(struct flb_event_chunk *event_chunk, /* Convert binary logs into a JSON payload */ ret = azure_format(event_chunk->data, event_chunk->size, - &buf_data, &buf_size, ctx); + event_chunk->tag, &final_log_type, &buf_data, &buf_size, ctx); + /* If cannot get matching record using log_type_prefix, use log_type directly */ + if (!final_log_type) { + final_log_type = ctx->log_type; + } + if (ret == -1) { flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(FLB_ERROR); @@ -307,7 +335,7 @@ static void cb_azure_flush(struct flb_event_chunk *event_chunk, flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX); /* Append headers and Azure signature */ - ret = build_headers(c, flb_sds_len(payload), ctx); + ret = build_headers(c, final_log_type, flb_sds_len(payload), ctx); if (ret == -1) { flb_plg_error(ctx->ins, "error composing signature"); flb_sds_destroy(payload); @@ -339,6 +367,9 @@ static void cb_azure_flush(struct flb_event_chunk *event_chunk, } /* Cleanup */ + if (final_log_type != ctx->log_type) { + flb_sds_destroy(final_log_type); + } flb_http_client_destroy(c); flb_sds_destroy(payload); flb_upstream_conn_release(u_conn); @@ -380,6 +411,14 @@ static struct flb_config_map config_map[] = { "The name of the event type." }, + { + FLB_CONFIG_MAP_STR, "log_type_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_azure, log_type_key), + "If included, the value for this key will be looked upon in the record " + "and if present, will over-write the `log_type`. If the key/value " + "is not found in the record then the `log_type` option will be used. " + }, + { FLB_CONFIG_MAP_STR, "time_key", FLB_AZURE_TIME_KEY, 0, FLB_TRUE, offsetof(struct flb_azure, time_key), diff --git a/plugins/out_azure/azure.h b/plugins/out_azure/azure.h index c696d39f71b..192d41ac8ea 100644 --- a/plugins/out_azure/azure.h +++ b/plugins/out_azure/azure.h @@ -30,11 +30,13 @@ #include #include #include +#include struct flb_azure { /* account setup */ flb_sds_t customer_id; flb_sds_t log_type; + flb_sds_t log_type_key; flb_sds_t shared_key; flb_sds_t dec_shared_key; @@ -45,6 +47,7 @@ struct flb_azure { /* records */ flb_sds_t time_key; + struct flb_record_accessor *ra_prefix_key; /* time_generated: on/off */ int time_generated; diff --git a/plugins/out_azure/azure_conf.c b/plugins/out_azure/azure_conf.c index d93cac9dc9d..9f8f8a05f92 100644 --- a/plugins/out_azure/azure_conf.c +++ b/plugins/out_azure/azure_conf.c @@ -20,6 +20,8 @@ #include #include #include +#include +#include #include "azure.h" #include "azure_conf.h" @@ -33,6 +35,7 @@ struct flb_azure *flb_azure_conf_create(struct flb_output_instance *ins, const char *tmp; struct flb_upstream *upstream; struct flb_azure *ctx; + struct flb_record_accessor *ra_prefix_key = NULL; /* Allocate config context */ ctx = flb_calloc(1, sizeof(struct flb_azure)); @@ -84,6 +87,20 @@ struct flb_azure *flb_azure_conf_create(struct flb_output_instance *ins, } flb_sds_len_set(ctx->dec_shared_key, olen); + /* config: 'log_type_key' */ + if (ctx->log_type_key) { + ra_prefix_key = flb_ra_create(ctx->log_type_key, FLB_TRUE); + + if (!ra_prefix_key) { + flb_plg_error(ctx->ins, "invalid log_type_key pattern '%s'", ctx->log_type_key); + flb_azure_conf_destroy(ctx); + return NULL; + } + else { + ctx->ra_prefix_key = ra_prefix_key; + } + } + /* Validate hostname given by command line or 'Host' property */ if (!ins->host.name && !ctx->customer_id) { flb_plg_error(ctx->ins, "property 'customer_id' is not defined"); @@ -190,6 +207,9 @@ int flb_azure_conf_destroy(struct flb_azure *ctx) if (ctx->uri) { flb_sds_destroy(ctx->uri); } + if (ctx->ra_prefix_key) { + flb_ra_destroy(ctx->ra_prefix_key); + } if (ctx->u) { flb_upstream_destroy(ctx->u); }