diff --git a/plugins/out_loki/loki.c b/plugins/out_loki/loki.c index 57eb4f681b5..d93a3f9aad6 100644 --- a/plugins/out_loki/loki.c +++ b/plugins/out_loki/loki.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -905,6 +906,7 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins, int io_flags = 0; struct flb_loki *ctx; struct flb_upstream *upstream; + char *compress; /* Create context */ ctx = flb_calloc(1, sizeof(struct flb_loki)); @@ -951,6 +953,15 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins, } } + /* Compress (gzip) */ + compress = (char *) flb_output_get_property("compress", ins); + ctx->compress_gzip = FLB_FALSE; + if (compress) { + if (strcasecmp(compress, "gzip") == 0) { + ctx->compress_gzip = FLB_TRUE; + } + } + /* Line Format */ if (strcasecmp(ctx->line_format, "json") == 0) { ctx->out_line_format = FLB_LOKI_FMT_JSON; @@ -1477,6 +1488,16 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx, return json; } +static void payload_release(void *payload, int compressed) +{ + if (compressed) { + flb_free(payload); + } + else { + flb_sds_destroy(payload); + } +} + static void cb_loki_flush(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, struct flb_input_instance *i_ins, @@ -1487,6 +1508,9 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk, int out_ret = FLB_OK; size_t b_sent; flb_sds_t payload = NULL; + flb_sds_t out_buf = NULL; + size_t out_size; + int compressed = FLB_FALSE; struct flb_loki *ctx = out_context; struct flb_connection *u_conn; struct flb_http_client *c; @@ -1520,31 +1544,48 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk, flb_sds_len(event_chunk->tag), event_chunk->data, event_chunk->size, &dynamic_tenant_id->value); + if (!payload) { flb_plg_error(ctx->ins, "cannot compose request payload"); FLB_OUTPUT_RETURN(FLB_RETRY); } + /* Map buffer */ + out_buf = payload; + out_size = flb_sds_len(payload); + + if (ctx->compress_gzip == FLB_TRUE) { + ret = flb_gzip_compress((void *) payload, flb_sds_len(payload), (void **) &out_buf, &out_size); + if (ret == -1) { + flb_plg_error(ctx->ins, + "cannot gzip payload, disabling compression"); + } else { + compressed = FLB_TRUE; + /* payload is not longer needed */ + flb_sds_destroy(payload); + } + } + /* Lookup an available connection context */ u_conn = flb_upstream_conn_get(ctx->u); if (!u_conn) { flb_plg_error(ctx->ins, "no upstream connections available"); - flb_sds_destroy(payload); + payload_release(out_buf, compressed); FLB_OUTPUT_RETURN(FLB_RETRY); } /* Create HTTP client context */ c = flb_http_client(u_conn, FLB_HTTP_POST, FLB_LOKI_URI, - payload, flb_sds_len(payload), + out_buf, out_size, ctx->tcp_host, ctx->tcp_port, NULL, 0); if (!c) { flb_plg_error(ctx->ins, "cannot create HTTP client context"); - flb_sds_destroy(payload); + payload_release(out_buf, compressed); flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(FLB_RETRY); @@ -1568,6 +1609,10 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk, FLB_LOKI_CT, sizeof(FLB_LOKI_CT) - 1, FLB_LOKI_CT_JSON, sizeof(FLB_LOKI_CT_JSON) - 1); + if (compressed == FLB_TRUE) { + flb_http_set_content_encoding_gzip(c); + } + /* Add X-Scope-OrgID header */ if (dynamic_tenant_id->value != NULL) { flb_http_add_header(c, @@ -1583,7 +1628,7 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk, /* Send HTTP request */ ret = flb_http_do(c, &b_sent); - flb_sds_destroy(payload); + payload_release(out_buf, compressed); /* Validate HTTP client return status */ if (ret == 0) { @@ -1760,6 +1805,12 @@ static struct flb_config_map config_map[] = { "Set bearer token auth" }, + { + FLB_CONFIG_MAP_STR, "compress", NULL, + 0, FLB_FALSE, 0, + "Set payload compression in network transfer. Option available is 'gzip'" + }, + /* EOF */ {0} }; diff --git a/plugins/out_loki/loki.h b/plugins/out_loki/loki.h index 39ffed8d8a9..2011cee3ded 100644 --- a/plugins/out_loki/loki.h +++ b/plugins/out_loki/loki.h @@ -58,6 +58,7 @@ struct flb_loki { flb_sds_t line_format; flb_sds_t tenant_id; flb_sds_t tenant_id_key_config; + int compress_gzip; /* HTTP Auth */ flb_sds_t http_user;