Skip to content

Commit

Permalink
out_azure: improvement: add table prefix support for Azure Log Analyt…
Browse files Browse the repository at this point in the history
…ics plugin (#7663)

---------

Signed-off-by: Kushal Azim Ekram <[email protected]>
  • Loading branch information
kforeverisback authored Sep 23, 2023
1 parent e33759c commit 4759dcc
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 3 deletions.
45 changes: 42 additions & 3 deletions plugins/out_azure/azure.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_record_accessor.h>
#include <fluent-bit/flb_ra_key.h>
#include <msgpack.h>

#include "azure.h"
Expand All @@ -47,6 +49,7 @@ static int cb_azure_init(struct flb_output_instance *ins,
}

static int azure_format(const void *in_buf, size_t in_bytes,
flb_sds_t tag, flb_sds_t *tag_val_out,
char **out_buf, size_t *out_size,
struct flb_azure *ctx)
{
Expand All @@ -68,6 +71,7 @@ static int azure_format(const void *in_buf, size_t in_bytes,
int len;
struct flb_log_event_decoder log_decoder;
struct flb_log_event log_event;
flb_sds_t tmp = NULL;
int ret;

/* Count number of items */
Expand Down Expand Up @@ -97,6 +101,23 @@ static int azure_format(const void *in_buf, size_t in_bytes,
map = *log_event.body;
map_size = map.via.map.size;

if (ctx->log_type_key) {
tmp = flb_ra_translate(ctx->ra_prefix_key,
tag, flb_sds_len(tag),
map, NULL);
if (!tmp) {
flb_plg_error(ctx->ins, "Tagged record translation failed!");
}
else if (flb_sds_is_empty(tmp)) {
flb_plg_warn(ctx->ins, "Record accessor key not matched");
flb_sds_destroy(tmp);
}
else {
/* tag_val_out must be destroyed by the caller */
*tag_val_out = tmp;
}
}

msgpack_pack_map(&mp_pck, map_size + 1);

/* Append the time key */
Expand Down Expand Up @@ -160,6 +181,7 @@ static int azure_format(const void *in_buf, size_t in_bytes,
}

static int build_headers(struct flb_http_client *c,
flb_sds_t log_type,
size_t content_length,
struct flb_azure *ctx)
{
Expand Down Expand Up @@ -238,7 +260,7 @@ static int build_headers(struct flb_http_client *c,
/* Append headers */
flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
flb_http_add_header(c, "Log-Type", 8,
ctx->log_type, flb_sds_len(ctx->log_type));
log_type, flb_sds_len(log_type));
flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
flb_http_add_header(c, "x-ms-date", 9, rfc1123date,
flb_sds_len(rfc1123date));
Expand Down Expand Up @@ -283,6 +305,7 @@ static void cb_azure_flush(struct flb_event_chunk *event_chunk,
struct flb_connection *u_conn;
struct flb_http_client *c;
flb_sds_t payload;
flb_sds_t final_log_type = NULL;
(void) i_ins;
(void) config;

Expand All @@ -294,7 +317,12 @@ static void cb_azure_flush(struct flb_event_chunk *event_chunk,

/* Convert binary logs into a JSON payload */
ret = azure_format(event_chunk->data, event_chunk->size,
&buf_data, &buf_size, ctx);
event_chunk->tag, &final_log_type, &buf_data, &buf_size, ctx);
/* If cannot get matching record using log_type_prefix, use log_type directly */
if (!final_log_type) {
final_log_type = ctx->log_type;
}

if (ret == -1) {
flb_upstream_conn_release(u_conn);
FLB_OUTPUT_RETURN(FLB_ERROR);
Expand All @@ -307,7 +335,7 @@ static void cb_azure_flush(struct flb_event_chunk *event_chunk,
flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX);

/* Append headers and Azure signature */
ret = build_headers(c, flb_sds_len(payload), ctx);
ret = build_headers(c, final_log_type, flb_sds_len(payload), ctx);
if (ret == -1) {
flb_plg_error(ctx->ins, "error composing signature");
flb_sds_destroy(payload);
Expand Down Expand Up @@ -339,6 +367,9 @@ static void cb_azure_flush(struct flb_event_chunk *event_chunk,
}

/* Cleanup */
if (final_log_type != ctx->log_type) {
flb_sds_destroy(final_log_type);
}
flb_http_client_destroy(c);
flb_sds_destroy(payload);
flb_upstream_conn_release(u_conn);
Expand Down Expand Up @@ -380,6 +411,14 @@ static struct flb_config_map config_map[] = {
"The name of the event type."
},

{
FLB_CONFIG_MAP_STR, "log_type_key", NULL,
0, FLB_TRUE, offsetof(struct flb_azure, log_type_key),
"If included, the value for this key will be looked upon in the record "
"and if present, will over-write the `log_type`. If the key/value "
"is not found in the record then the `log_type` option will be used. "
},

{
FLB_CONFIG_MAP_STR, "time_key", FLB_AZURE_TIME_KEY,
0, FLB_TRUE, offsetof(struct flb_azure, time_key),
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_azure/azure.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_output.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_record_accessor.h>

struct flb_azure {
/* account setup */
flb_sds_t customer_id;
flb_sds_t log_type;
flb_sds_t log_type_key;
flb_sds_t shared_key;
flb_sds_t dec_shared_key;

Expand All @@ -45,6 +47,7 @@ struct flb_azure {

/* records */
flb_sds_t time_key;
struct flb_record_accessor *ra_prefix_key;

/* time_generated: on/off */
int time_generated;
Expand Down
20 changes: 20 additions & 0 deletions plugins/out_azure/azure_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_base64.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_record_accessor.h>
#include <fluent-bit/flb_pack.h>

#include "azure.h"
#include "azure_conf.h"
Expand All @@ -33,6 +35,7 @@ struct flb_azure *flb_azure_conf_create(struct flb_output_instance *ins,
const char *tmp;
struct flb_upstream *upstream;
struct flb_azure *ctx;
struct flb_record_accessor *ra_prefix_key = NULL;

/* Allocate config context */
ctx = flb_calloc(1, sizeof(struct flb_azure));
Expand Down Expand Up @@ -84,6 +87,20 @@ struct flb_azure *flb_azure_conf_create(struct flb_output_instance *ins,
}
flb_sds_len_set(ctx->dec_shared_key, olen);

/* config: 'log_type_key' */
if (ctx->log_type_key) {
ra_prefix_key = flb_ra_create(ctx->log_type_key, FLB_TRUE);

if (!ra_prefix_key) {
flb_plg_error(ctx->ins, "invalid log_type_key pattern '%s'", ctx->log_type_key);
flb_azure_conf_destroy(ctx);
return NULL;
}
else {
ctx->ra_prefix_key = ra_prefix_key;
}
}

/* Validate hostname given by command line or 'Host' property */
if (!ins->host.name && !ctx->customer_id) {
flb_plg_error(ctx->ins, "property 'customer_id' is not defined");
Expand Down Expand Up @@ -190,6 +207,9 @@ int flb_azure_conf_destroy(struct flb_azure *ctx)
if (ctx->uri) {
flb_sds_destroy(ctx->uri);
}
if (ctx->ra_prefix_key) {
flb_ra_destroy(ctx->ra_prefix_key);
}
if (ctx->u) {
flb_upstream_destroy(ctx->u);
}
Expand Down

0 comments on commit 4759dcc

Please sign in to comment.