Skip to content

Commit

Permalink
out_azure_kusto : fix multiple files tail issue and timeout issue (#8430
Browse files Browse the repository at this point in the history
)

- added kusto specific headers
- randomized kusto ingestion resources refresh interval
- made kusto ingestion resources refresh interval configurable
- introduced gzip compression for payload
 -added dynamic parsing of azure kusto ingestion resources
- fixed deadlock and added granular locks
- added default kusto endpoints connection timeout interval configs

---------

Signed-off-by: Tanmaya Panda <[email protected]>
Co-authored-by: root <root@fluentbitvm.m2d2gsu3tjfejnx4nkwynra25e.bx.internal.cloudapp.net>
Co-authored-by: Ramachandran A G <[email protected]>
  • Loading branch information
3 people authored Nov 26, 2024
1 parent faf7da1 commit 557b0b5
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 43 deletions.
68 changes: 65 additions & 3 deletions plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
#include <fluent-bit/flb_oauth2.h>
#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_signv4.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_version.h>

#include "azure_kusto.h"
#include "azure_kusto_conf.h"
Expand Down Expand Up @@ -126,11 +127,21 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs
struct flb_http_client *c;
flb_sds_t resp = NULL;

flb_plg_debug(ctx->ins, "before getting upstream connection");

flb_plg_debug(ctx->ins, "Logging attributes of flb_azure_kusto_resources:");
flb_plg_debug(ctx->ins, "blob_ha: %p", ctx->resources->blob_ha);
flb_plg_debug(ctx->ins, "queue_ha: %p", ctx->resources->queue_ha);
flb_plg_debug(ctx->ins, "load_time: %lu", ctx->resources->load_time);

ctx->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout ;

/* Get upstream connection */
u_conn = flb_upstream_conn_get(ctx->u);

if (u_conn) {
token = get_azure_kusto_token(ctx);
flb_plg_debug(ctx->ins, "after get azure kusto token");

if (token) {
/* Compose request body */
Expand All @@ -152,6 +163,9 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs
flb_http_add_header(c, "Accept", 6, "application/json", 16);
flb_http_add_header(c, "Authorization", 13, token,
flb_sds_len(token));
flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR));
flb_http_add_header(c, "x-ms-app", 8, "Fluent-Bit", 10);
flb_http_add_header(c, "x-ms-user", 9, "Fluent-Bit", 10);
flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10);

/* Send HTTP request */
Expand Down Expand Up @@ -231,6 +245,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
*/
pthread_mutex_init(&ctx->token_mutex, NULL);
pthread_mutex_init(&ctx->resources_mutex, NULL);
pthread_mutex_init(&ctx->blob_mutex, NULL);

/*
* Create upstream context for Kusto Ingestion endpoint
Expand All @@ -250,6 +265,8 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
}
flb_output_upstream_set(ctx->u, ins);

flb_plg_debug(ctx->ins, "azure kusto init completed");

return 0;
}

Expand Down Expand Up @@ -367,16 +384,21 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
size_t json_size;
size_t tag_len;
struct flb_azure_kusto *ctx = out_context;
int is_compressed = FLB_FALSE;

(void)i_ins;
(void)config;

void *final_payload = NULL;
size_t final_payload_size = 0;

flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size);

tag_len = flb_sds_len(event_chunk->tag);

/* Load or refresh ingestion resources */
ret = azure_kusto_load_ingestion_resources(ctx, config);
flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot load ingestion resources");
FLB_OUTPUT_RETURN(FLB_RETRY);
Expand All @@ -385,12 +407,32 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
/* Reformat msgpack to JSON payload */
ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data,
event_chunk->size, (void **)&json, &json_size);
flb_plg_trace(ctx->ins, "format: ret=%d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot reformat data into json");
FLB_OUTPUT_RETURN(FLB_RETRY);
}

ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size);
/* Map buffer */
final_payload = json;
final_payload_size = json_size;
if (ctx->compression_enabled == FLB_TRUE) {
ret = flb_gzip_compress((void *) json, json_size,
&final_payload, &final_payload_size);
if (ret != 0) {
flb_plg_error(ctx->ins,
"cannot gzip payload");
flb_sds_destroy(json);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
else {
is_compressed = FLB_TRUE;
/* JSON buffer will be cleared at cleanup: */
}
}
flb_plg_trace(ctx->ins, "payload size before compression %zu & after compression %zu ", json_size ,final_payload_size);
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size);
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot perform queued ingestion");
flb_sds_destroy(json);
Expand All @@ -400,6 +442,10 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
/* Cleanup */
flb_sds_destroy(json);

/* release compressed payload */
if (is_compressed == FLB_TRUE) {
flb_free(final_payload);
}
/* Done */
FLB_OUTPUT_RETURN(FLB_OK);
}
Expand All @@ -417,6 +463,10 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config)
ctx->u = NULL;
}

pthread_mutex_destroy(&ctx->resources_mutex);
pthread_mutex_destroy(&ctx->token_mutex);
pthread_mutex_destroy(&ctx->blob_mutex);

flb_azure_kusto_conf_destroy(ctx);

return 0;
Expand Down Expand Up @@ -462,7 +512,19 @@ static struct flb_config_map config_map[] = {
offsetof(struct flb_azure_kusto, time_key),
"The key name of the time. If 'include_time_key' is false, "
"This property is ignored"},
/* EOF */
{FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout),
"Set the connection timeout of various kusto endpoints (kusto ingest endpoint, kusto ingestion blob endpoint, kusto ingestion queue endpoint) in seconds."
"The default is 60 seconds."},
{FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, compression_enabled),
"Enable HTTP payload compression (gzip)."
"The default is true."},
{FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE,
offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval),
"Set the azure kusto ingestion resources refresh interval"
"The default is 3600 seconds."},
/* EOF */
{0}};

struct flb_output_plugin out_azure_kusto_plugin = {
Expand Down
14 changes: 13 additions & 1 deletion plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@
#define AZURE_KUSTO_RESOURCE_UPSTREAM_URI "uri"
#define AZURE_KUSTO_RESOURCE_UPSTREAM_SAS "sas"

#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC 3600
#define FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC "3600"

#define FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT "60"


struct flb_azure_kusto_resources {
struct flb_upstream_ha *blob_ha;
Expand All @@ -70,6 +73,13 @@ struct flb_azure_kusto {
flb_sds_t table_name;
flb_sds_t ingestion_mapping_reference;

int ingestion_endpoint_connect_timeout;

/* compress payload */
int compression_enabled;

int ingestion_resources_refresh_interval;

/* records configuration */
flb_sds_t log_key;
int include_tag_key;
Expand All @@ -94,6 +104,8 @@ struct flb_azure_kusto {
/* mutex for loading reosurces */
pthread_mutex_t resources_mutex;

pthread_mutex_t blob_mutex;

/* Upstream connection to the backend server */
struct flb_upstream *u;

Expand Down
75 changes: 51 additions & 24 deletions plugins/out_azure_kusto/azure_kusto_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi
{
jsmn_parser parser;
jsmntok_t *t;
jsmntok_t *tokens;
int tok_size = 100;
jsmntok_t *tokens = NULL;
int ret = -1;
int i;
int blob_count = 0;
Expand Down Expand Up @@ -200,10 +199,18 @@ static int parse_storage_resources(struct flb_azure_kusto *ctx, struct flb_confi
}

jsmn_init(&parser);
tokens = flb_calloc(1, sizeof(jsmntok_t) * tok_size);

/* Dynamically allocate memory for tokens based on response length */
tokens = flb_calloc(1, sizeof(jsmntok_t) * (flb_sds_len(response)));

if (!tokens) {
flb_errno(); /* Log the error using flb_errno() */
flb_plg_error(ctx->ins, "failed to allocate memory for tokens");
return -1;
}

if (tokens) {
ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, tok_size);
ret = jsmn_parse(&parser, response, flb_sds_len(response), tokens, flb_sds_len(response));

if (ret > 0) {
/* skip all tokens until we reach "Rows" */
Expand Down Expand Up @@ -417,6 +424,24 @@ static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx,
return identity_token;
}



/**
* This method returns random integers from range -600 to +600 which needs to be added
* to the kusto ingestion resources refresh interval to even out the spikes
* in kusto DM for .get ingestion resources upon expiry
* */
int azure_kusto_generate_random_integer() {
/* Seed the random number generator */
int pid = getpid();
unsigned long address = (unsigned long)&address;
unsigned int seed = pid ^ (address & 0xFFFFFFFF) * time(0);
srand(seed);
/* Generate a random integer in the range [-600, 600] */
int random_integer = rand() % 1201 - 600;
return random_integer;
}

int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
struct flb_config *config)
{
Expand All @@ -427,22 +452,20 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
struct flb_upstream_ha *queue_ha = NULL;
time_t now;

if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return -1;
}
int generated_random_integer = azure_kusto_generate_random_integer();
flb_plg_debug(ctx->ins, "generated random integer is %d", generated_random_integer);

now = time(NULL);

/* check if we have all resources and they are not stale */
if (ctx->resources->blob_ha && ctx->resources->queue_ha &&
ctx->resources->identity_token &&
now - ctx->resources->load_time < FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC) {
now - ctx->resources->load_time < ctx->ingestion_resources_refresh_interval + generated_random_integer) {
flb_plg_debug(ctx->ins, "resources are already loaded and are not stale");
ret = 0;
}
else {
flb_plg_info(ctx->ins, "loading kusto ingestion resourcs");
flb_plg_info(ctx->ins, "loading kusto ingestion resourcs and refresh interval is %d", ctx->ingestion_resources_refresh_interval + generated_random_integer);
response = execute_ingest_csl_command(ctx, ".get ingestion resources");

if (response) {
Expand All @@ -452,9 +475,19 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
blob_ha = flb_upstream_ha_create("azure_kusto_blob_ha");

if (blob_ha) {

if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return -1;
}
ret =
parse_storage_resources(ctx, config, response, blob_ha, queue_ha);

if (pthread_mutex_unlock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}

if (ret == 0) {
flb_sds_destroy(response);
response = NULL;
Expand All @@ -463,31 +496,30 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
execute_ingest_csl_command(ctx, ".get kusto identity token");

if (response) {
if (pthread_mutex_lock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return -1;
}
identity_token =
parse_ingestion_identity_token(ctx, response);

if (identity_token) {
ret = flb_azure_kusto_resources_clear(ctx->resources);

if (ret != -1) {
ctx->resources->blob_ha = blob_ha;
ctx->resources->queue_ha = queue_ha;
ctx->resources->identity_token = identity_token;
ctx->resources->load_time = now;

ret = 0;
}
else {
flb_plg_error(
ctx->ins,
"error destroying previous ingestion resources");
}
}
else {
flb_plg_error(ctx->ins,
"error parsing ingestion identity token");
ret = -1;
}
if (pthread_mutex_unlock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}
}
else {
flb_plg_error(ctx->ins, "error getting kusto identity token");
Expand Down Expand Up @@ -526,11 +558,6 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
}
}

if (pthread_mutex_unlock(&ctx->resources_mutex)) {
flb_plg_error(ctx->ins, "error unlocking mutex");
return -1;
}

return ret;
}

Expand Down
Loading

0 comments on commit 557b0b5

Please sign in to comment.