diff --git a/include/fluent-bit/flb_lib.h b/include/fluent-bit/flb_lib.h index 88ff8cb0af3..c97b1a93c3e 100644 --- a/include/fluent-bit/flb_lib.h +++ b/include/fluent-bit/flb_lib.h @@ -22,6 +22,7 @@ #include #include +#include /* Lib engine status */ #define FLB_LIB_ERROR -1 @@ -69,7 +70,14 @@ FLB_EXPORT int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name, void (*out_callback) (void *, int, int, void *, size_t, void *), void *out_callback_data, - void *test_ctx); + void *flush_ctx); +FLB_EXPORT int flb_output_set_test_flush_ctx_callback(flb_ctx_t *ctx, int ffd, + char *test_name, + void *(*flush_ctx_callback)( + struct flb_config *, + struct flb_input_instance *, + void *, void *), + void *flush_ctx); FLB_EXPORT int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name, void (*cb)(char *, void *, void *)); FLB_EXPORT int flb_output_set_http_test(flb_ctx_t *ctx, int ffd, char *test_name, diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index c8b007806f2..bfa1cd0ee2b 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -143,9 +143,24 @@ struct flb_test_out_formatter { */ void *rt_data; - /* optional context for flush callback */ + /* optional context for "flush context callback" */ void *flush_ctx; + /* + * Callback + * ========= + * Optional "flush context callback": it references the function that extracts + * optional flush context for "formatter callback". + */ + void *(*flush_ctx_callback) (/* Fluent Bit context */ + struct flb_config *, + /* plugin that ingested the records */ + struct flb_input_instance *, + /* plugin instance context */ + void *plugin_context, + /* context for "flush context callback" */ + void *flush_ctx); + /* * Callback * ========= diff --git a/plugins/out_es/CMakeLists.txt b/plugins/out_es/CMakeLists.txt index 4fad4f27cec..39aac4b8e50 100644 --- a/plugins/out_es/CMakeLists.txt +++ b/plugins/out_es/CMakeLists.txt @@ -1,5 +1,6 @@ set(src es_bulk.c + es_conf_parse.c es_conf.c es.c murmur3.c) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index c896b1ed258..9b8cd1b7e3d 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -17,6 +17,9 @@ * limitations under the License. */ +#include +#include + #include #include #include @@ -24,17 +27,15 @@ #include #include #include -#include #include #include #include #include -#include - -#include +#include #include "es.h" #include "es_conf.h" +#include "es_conf_prop.h" #include "es_bulk.h" #include "murmur3.h" @@ -42,11 +43,12 @@ struct flb_output_plugin out_es_plugin; static int es_pack_array_content(msgpack_packer *tmp_pck, msgpack_object array, - struct flb_elasticsearch *ctx); + struct flb_elasticsearch_config *ec); #ifdef FLB_HAVE_AWS static flb_sds_t add_aws_auth(struct flb_http_client *c, - struct flb_elasticsearch *ctx) + struct flb_elasticsearch *ctx, + struct flb_elasticsearch_config *ec) { flb_sds_t signature = NULL; int ret; @@ -64,9 +66,9 @@ static flb_sds_t add_aws_auth(struct flb_http_client *c, flb_http_add_header(c, "User-Agent", 10, "aws-fluent-bit-plugin", 21); signature = flb_signv4_do(c, FLB_TRUE, FLB_TRUE, time(NULL), - ctx->aws_region, ctx->aws_service_name, - S3_MODE_SIGNED_PAYLOAD, ctx->aws_unsigned_headers, - ctx->aws_provider); + ec->aws_region, ec->aws_service_name, + S3_MODE_SIGNED_PAYLOAD, ec->aws_unsigned_headers, + ec->aws_provider); if (!signature) { flb_plg_error(ctx->ins, "could not sign request with sigv4"); return NULL; @@ -77,7 +79,7 @@ static flb_sds_t add_aws_auth(struct flb_http_client *c, static int es_pack_map_content(msgpack_packer *tmp_pck, msgpack_object map, - struct flb_elasticsearch *ctx) + struct flb_elasticsearch_config *ec) { int i; char *ptr_key = NULL; @@ -126,7 +128,7 @@ static int es_pack_map_content(msgpack_packer *tmp_pck, * * https://goo.gl/R5NMTr */ - if (ctx->replace_dots == FLB_TRUE) { + if (ec->replace_dots == FLB_TRUE) { char *p = ptr_key; char *end = ptr_key + key_size; while (p != end) { @@ -151,7 +153,7 @@ static int es_pack_map_content(msgpack_packer *tmp_pck, */ if (v->type == MSGPACK_OBJECT_MAP) { msgpack_pack_map(tmp_pck, v->via.map.size); - es_pack_map_content(tmp_pck, *v, ctx); + es_pack_map_content(tmp_pck, *v, ec); } /* * The value can be any data type, if it's an array we need to @@ -159,7 +161,7 @@ static int es_pack_map_content(msgpack_packer *tmp_pck, */ else if (v->type == MSGPACK_OBJECT_ARRAY) { msgpack_pack_array(tmp_pck, v->via.array.size); - es_pack_array_content(tmp_pck, *v, ctx); + es_pack_array_content(tmp_pck, *v, ec); } else { msgpack_pack_object(tmp_pck, *v); @@ -174,7 +176,7 @@ static int es_pack_map_content(msgpack_packer *tmp_pck, */ static int es_pack_array_content(msgpack_packer *tmp_pck, msgpack_object array, - struct flb_elasticsearch *ctx) + struct flb_elasticsearch_config *ec) { int i; msgpack_object *e; @@ -184,12 +186,12 @@ static int es_pack_array_content(msgpack_packer *tmp_pck, if (e->type == MSGPACK_OBJECT_MAP) { msgpack_pack_map(tmp_pck, e->via.map.size); - es_pack_map_content(tmp_pck, *e, ctx); + es_pack_map_content(tmp_pck, *e, ec); } else if (e->type == MSGPACK_OBJECT_ARRAY) { msgpack_pack_array(tmp_pck, e->via.array.size); - es_pack_array_content(tmp_pck, *e, ctx); + es_pack_array_content(tmp_pck, *e, ec); } else { @@ -205,19 +207,20 @@ static int es_pack_array_content(msgpack_packer *tmp_pck, * If it failed, return NULL. */ static flb_sds_t es_get_id_value(struct flb_elasticsearch *ctx, - msgpack_object *map) + struct flb_elasticsearch_config *ec, + msgpack_object *map) { struct flb_ra_value *rval = NULL; flb_sds_t tmp_str; - rval = flb_ra_get_value_object(ctx->ra_id_key, *map); + rval = flb_ra_get_value_object(ec->ra_id_key, *map); if (rval == NULL) { flb_plg_warn(ctx->ins, "the value of %s is missing", - ctx->id_key); + ec->id_key); return NULL; } else if(rval->o.type != MSGPACK_OBJECT_STR) { flb_plg_warn(ctx->ins, "the value of %s is not string", - ctx->id_key); + ec->id_key); flb_ra_key_value_destroy(rval); return NULL; } @@ -233,7 +236,7 @@ static flb_sds_t es_get_id_value(struct flb_elasticsearch *ctx, return tmp_str; } -static int compose_index_header(struct flb_elasticsearch *ctx, +static int compose_index_header(struct flb_elasticsearch_config *ec, int es_index_custom_len, char *logstash_index, size_t logstash_index_size, char *separator_str, @@ -248,7 +251,7 @@ static int compose_index_header(struct flb_elasticsearch *ctx, if (es_index_custom_len > 0) { p = logstash_index + es_index_custom_len; } else { - p = logstash_index + flb_sds_len(ctx->logstash_prefix); + p = logstash_index + flb_sds_len(ec->logstash_prefix); } len = p - logstash_index; ret = snprintf(p, logstash_index_size - len, "%s", @@ -261,7 +264,7 @@ static int compose_index_header(struct flb_elasticsearch *ctx, len += strlen(separator_str); s = strftime(p, logstash_index_size - len, - ctx->logstash_dateformat, tm); + ec->logstash_dateformat, tm); if (s==0) { /* exceed limit */ return -1; @@ -316,6 +319,7 @@ static int elasticsearch_format(struct flb_config *config, uint16_t hash[8]; int es_index_custom_len; struct flb_elasticsearch *ctx = plugin_context; + struct flb_elasticsearch_config *ec = flush_ctx; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; @@ -344,8 +348,8 @@ static int elasticsearch_format(struct flb_config *config, } /* Copy logstash prefix if logstash format is enabled */ - if (ctx->logstash_format == FLB_TRUE) { - strncpy(logstash_index, ctx->logstash_prefix, sizeof(logstash_index)); + if (ec->logstash_format == FLB_TRUE) { + strncpy(logstash_index, ec->logstash_prefix, sizeof(logstash_index)); logstash_index[sizeof(logstash_index) - 1] = '\0'; } @@ -356,25 +360,25 @@ static int elasticsearch_format(struct flb_config *config, * The header stored in 'j_index' will be used for the all records on * this payload. */ - if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) { + if (ec->logstash_format == FLB_FALSE && ec->generate_id == FLB_FALSE) { flb_time_get(&tms); gmtime_r(&tms.tm.tv_sec, &tm); strftime(index_formatted, sizeof(index_formatted) - 1, - ctx->index, &tm); + ec->index, &tm); es_index = index_formatted; - if (ctx->suppress_type_name) { + if (ec->suppress_type_name) { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_WITHOUT_TYPE, - ctx->es_action, + ec->es_action, es_index); } else { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT, - ctx->es_action, - es_index, ctx->type); + ec->es_action, + es_index, ec->type); } } @@ -384,7 +388,7 @@ static int elasticsearch_format(struct flb_config *config, * in order to prevent generating millions of indexes * we can set to always use current time for index generation */ - if (ctx->current_time_index == FLB_TRUE) { + if (ec->current_time_index == FLB_TRUE) { flb_time_get(&tms); } @@ -394,7 +398,7 @@ static int elasticsearch_format(struct flb_config *config, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { /* Only pop time from record if current_time_index is disabled */ - if (ctx->current_time_index == FLB_FALSE) { + if (ec->current_time_index == FLB_FALSE) { flb_time_copy(&tms, &log_event.timestamp); } @@ -402,8 +406,8 @@ static int elasticsearch_format(struct flb_config *config, map_size = map.via.map.size; es_index_custom_len = 0; - if (ctx->logstash_prefix_key) { - flb_sds_t v = flb_ra_translate(ctx->ra_prefix_key, + if (ec->logstash_prefix_key) { + flb_sds_t v = flb_ra_translate(ec->ra_prefix_key, (char *) tag, tag_len, map, NULL); if (v) { @@ -424,7 +428,7 @@ static int elasticsearch_format(struct flb_config *config, msgpack_sbuffer_init(&tmp_sbuf); msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); - if (ctx->include_tag_key == FLB_TRUE) { + if (ec->include_tag_key == FLB_TRUE) { map_size++; } @@ -432,14 +436,14 @@ static int elasticsearch_format(struct flb_config *config, msgpack_pack_map(&tmp_pck, map_size + 1); /* Append the time key */ - msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->time_key)); - msgpack_pack_str_body(&tmp_pck, ctx->time_key, flb_sds_len(ctx->time_key)); + msgpack_pack_str(&tmp_pck, flb_sds_len(ec->time_key)); + msgpack_pack_str_body(&tmp_pck, ec->time_key, flb_sds_len(ec->time_key)); /* Format the time */ gmtime_r(&tms.tm.tv_sec, &tm); s = strftime(time_formatted, sizeof(time_formatted) - 1, - ctx->time_key_format, &tm); - if (ctx->time_key_nanos) { + ec->time_key_format, &tm); + if (ec->time_key_nanos) { len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, ".%09" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec); } else { @@ -452,47 +456,47 @@ static int elasticsearch_format(struct flb_config *config, msgpack_pack_str(&tmp_pck, s); msgpack_pack_str_body(&tmp_pck, time_formatted, s); - es_index = ctx->index; - if (ctx->logstash_format == FLB_TRUE) { - ret = compose_index_header(ctx, es_index_custom_len, + es_index = ec->index; + if (ec->logstash_format == FLB_TRUE) { + ret = compose_index_header(ec, es_index_custom_len, &logstash_index[0], sizeof(logstash_index), - ctx->logstash_prefix_separator, &tm); + ec->logstash_prefix_separator, &tm); if (ret < 0) { /* retry with default separator */ - compose_index_header(ctx, es_index_custom_len, + compose_index_header(ec, es_index_custom_len, &logstash_index[0], sizeof(logstash_index), "-", &tm); } es_index = logstash_index; - if (ctx->generate_id == FLB_FALSE) { - if (ctx->suppress_type_name) { + if (ec->generate_id == FLB_FALSE) { + if (ec->suppress_type_name) { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_WITHOUT_TYPE, - ctx->es_action, + ec->es_action, es_index); } else { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT, - ctx->es_action, - es_index, ctx->type); + ec->es_action, + es_index, ec->type); } } } - else if (ctx->current_time_index == FLB_TRUE) { + else if (ec->current_time_index == FLB_TRUE) { /* Make sure we handle index time format for index */ strftime(index_formatted, sizeof(index_formatted) - 1, - ctx->index, &tm); + ec->index, &tm); es_index = index_formatted; } /* Tag Key */ - if (ctx->include_tag_key == FLB_TRUE) { - msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->tag_key)); - msgpack_pack_str_body(&tmp_pck, ctx->tag_key, flb_sds_len(ctx->tag_key)); + if (ec->include_tag_key == FLB_TRUE) { + msgpack_pack_str(&tmp_pck, flb_sds_len(ec->tag_key)); + msgpack_pack_str_body(&tmp_pck, ec->tag_key, flb_sds_len(ec->tag_key)); msgpack_pack_str(&tmp_pck, tag_len); msgpack_pack_str_body(&tmp_pck, tag, tag_len); } @@ -504,7 +508,7 @@ static int elasticsearch_format(struct flb_config *config, * Elasticsearch have a restriction that key names cannot contain * a dot; if some dot is found, it's replaced with an underscore. */ - ret = es_pack_map_content(&tmp_pck, map, ctx); + ret = es_pack_map_content(&tmp_pck, map, ec); if (ret == -1) { flb_log_event_decoder_destroy(&log_decoder); msgpack_sbuffer_destroy(&tmp_sbuf); @@ -513,43 +517,43 @@ static int elasticsearch_format(struct flb_config *config, return -1; } - if (ctx->generate_id == FLB_TRUE) { + if (ec->generate_id == FLB_TRUE) { MurmurHash3_x64_128(tmp_sbuf.data, tmp_sbuf.size, 42, hash); snprintf(es_uuid, sizeof(es_uuid), "%04x%04x-%04x-%04x-%04x-%04x%04x%04x", hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7]); - if (ctx->suppress_type_name) { + if (ec->suppress_type_name) { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE, - ctx->es_action, + ec->es_action, es_index, es_uuid); } else { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_ID, - ctx->es_action, - es_index, ctx->type, es_uuid); + ec->es_action, + es_index, ec->type, es_uuid); } } - if (ctx->ra_id_key) { - id_key_str = es_get_id_value(ctx ,&map); + if (ec->ra_id_key) { + id_key_str = es_get_id_value(ctx, ec ,&map); if (id_key_str) { - if (ctx->suppress_type_name) { + if (ec->suppress_type_name) { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE, - ctx->es_action, + ec->es_action, es_index, id_key_str); } else { index_len = flb_sds_snprintf(&j_index, flb_sds_alloc(j_index), ES_BULK_INDEX_FMT_ID, - ctx->es_action, - es_index, ctx->type, id_key_str); + ec->es_action, + es_index, ec->type, id_key_str); } flb_sds_destroy(id_key_str); id_key_str = NULL; @@ -567,13 +571,13 @@ static int elasticsearch_format(struct flb_config *config, } out_buf_len = flb_sds_len(out_buf); - if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0) { + if (strcasecmp(ec->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0) { tmp_buf = out_buf; out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPDATE_OP_BODY) - 2); out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPDATE_OP_BODY, tmp_buf); flb_sds_destroy(tmp_buf); } - else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) { + else if (strcasecmp(ec->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) { tmp_buf = out_buf; out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPSERT_OP_BODY) - 2); out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPSERT_OP_BODY, tmp_buf); @@ -607,7 +611,7 @@ static int elasticsearch_format(struct flb_config *config, * return the bulk->ptr buffer */ flb_free(bulk); - if (ctx->trace_output) { + if (ec->trace_output) { fwrite(*out_data, 1, *out_size, stdout); fflush(stdout); } @@ -627,10 +631,6 @@ static int cb_es_init(struct flb_output_instance *ins, return -1; } - flb_plg_debug(ctx->ins, "host=%s port=%i uri=%s index=%s type=%s", - ins->host.name, ins->host.port, ctx->uri, - ctx->index, ctx->type); - flb_output_set_context(ins, ctx); /* @@ -642,6 +642,30 @@ static int cb_es_init(struct flb_output_instance *ins, return 0; } +struct flb_elasticsearch_config *flb_elasticsearch_target( + struct flb_elasticsearch *ctx, struct flb_upstream_node **node) +{ + struct flb_elasticsearch_config *ec; + struct flb_upstream_node *target_node; + + if (ctx->ha_mode == FLB_FALSE) { + ec = flb_es_upstream_conf(ctx, NULL); + *node = NULL; + return ec; + } + + target_node = flb_upstream_ha_node_get(ctx->ha); + if (!target_node) { + *node = NULL; + return NULL; + } + + ec = flb_es_upstream_conf(ctx, target_node); + *node = target_node; + + return ec; +} + static int elasticsearch_error_check(struct flb_elasticsearch *ctx, struct flb_http_client *c) { @@ -813,20 +837,38 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, size_t out_size; size_t b_sent; struct flb_elasticsearch *ctx = out_context; + struct flb_elasticsearch_config *ec; struct flb_connection *u_conn; + struct flb_upstream_node *node; struct flb_http_client *c; flb_sds_t signature = NULL; int compressed = FLB_FALSE; - /* Get upstream connection */ - u_conn = flb_upstream_conn_get(ctx->u); - if (!u_conn) { + ec = flb_elasticsearch_target(ctx, &node); + if (!ec) { FLB_OUTPUT_RETURN(FLB_RETRY); } + /* Get upstream connection */ + if (ctx->ha_mode == FLB_TRUE) { + u_conn = flb_upstream_conn_get(node->u); + if (!u_conn) { + flb_plg_error(ctx->ins, "no upstream connections available for %s node", + node->name); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + } + else { + u_conn = flb_upstream_conn_get(ctx->u); + if (!u_conn) { + flb_plg_error(ctx->ins, "no upstream connections available"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + } + /* Convert format */ ret = elasticsearch_format(config, ins, - ctx, NULL, + ctx, ec, event_chunk->type, event_chunk->tag, flb_sds_len(event_chunk->tag), event_chunk->data, event_chunk->size, @@ -840,7 +882,7 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, pack_size = out_size; /* Should we compress the payload ? */ - if (ctx->compress_gzip == FLB_TRUE) { + if (ec->compress_gzip == FLB_TRUE) { ret = flb_gzip_compress((void *) pack, pack_size, &out_buf, &out_size); if (ret == -1) { @@ -863,10 +905,10 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, } /* Compose HTTP Client request */ - c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri, + c = flb_http_client(u_conn, FLB_HTTP_POST, ec->uri, pack, pack_size, NULL, 0, NULL, 0); - flb_http_buffer_size(c, ctx->buffer_size); + flb_http_buffer_size(c, ec->buffer_size); #ifndef FLB_HAVE_AWS flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); @@ -874,16 +916,16 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, flb_http_add_header(c, "Content-Type", 12, "application/x-ndjson", 20); - if (ctx->http_user && ctx->http_passwd) { - flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); + if (ec->http_user && ec->http_passwd) { + flb_http_basic_auth(c, ec->http_user, ec->http_passwd); } - else if (ctx->cloud_user && ctx->cloud_passwd) { - flb_http_basic_auth(c, ctx->cloud_user, ctx->cloud_passwd); + else if (ec->cloud_user && ec->cloud_passwd) { + flb_http_basic_auth(c, ec->cloud_user, ec->cloud_passwd); } #ifdef FLB_HAVE_AWS - if (ctx->has_aws_auth == FLB_TRUE) { - signature = add_aws_auth(c, ctx); + if (ec->has_aws_auth == FLB_TRUE) { + signature = add_aws_auth(c, ctx, ec); if (!signature) { goto retry; } @@ -903,20 +945,20 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, ret = flb_http_do(c, &b_sent); if (ret != 0) { - flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); + flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ec->uri); goto retry; } else { /* The request was issued successfully, validate the 'error' field */ - flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri); + flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ec->uri); if (c->resp.status != 200 && c->resp.status != 201) { if (c->resp.payload_size > 0) { flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n", - c->resp.status, ctx->uri, c->resp.payload); + c->resp.status, ec->uri, c->resp.payload); } else { flb_plg_error(ctx->ins, "HTTP status=%i URI=%s", - c->resp.status, ctx->uri); + c->resp.status, ec->uri); } goto retry; } @@ -933,7 +975,7 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, } else { /* we got an error */ - if (ctx->trace_error) { + if (ec->trace_error) { /* * If trace_error is set, trace the actual * response from Elasticsearch explaining the problem. @@ -993,42 +1035,50 @@ static int elasticsearch_response_test(struct flb_config *config, { int ret = 0; struct flb_elasticsearch *ctx = plugin_context; + struct flb_elasticsearch_config *ec; + struct flb_upstream_node *node; struct flb_connection *u_conn; struct flb_http_client *c; size_t b_sent; + ec = flb_elasticsearch_target(ctx, &node); + if (!ec) { + flb_plg_warn(ctx->ins, "http_do=%i", ret); + return -2; + } + /* Not retrieve upstream connection */ u_conn = NULL; /* Compose HTTP Client request (dummy client) */ - c = flb_http_dummy_client(u_conn, FLB_HTTP_POST, ctx->uri, + c = flb_http_dummy_client(u_conn, FLB_HTTP_POST, ec->uri, NULL, 0, NULL, 0, NULL, 0); - flb_http_buffer_size(c, ctx->buffer_size); + flb_http_buffer_size(c, ec->buffer_size); /* Just stubbing the HTTP responses */ flb_http_set_response_test(c, "response", data, bytes, status, NULL, NULL); ret = flb_http_do(c, &b_sent); if (ret != 0) { - flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); + flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ec->uri); goto error; } if (ret != 0) { - flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); + flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ec->uri); goto error; } else { /* The request was issued successfully, validate the 'error' field */ - flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri); + flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ec->uri); if (c->resp.status != 200 && c->resp.status != 201) { if (c->resp.payload_size > 0) { flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n", - c->resp.status, ctx->uri, c->resp.payload); + c->resp.status, ec->uri, c->resp.payload); } else { flb_plg_error(ctx->ins, "HTTP status=%i URI=%s", - c->resp.status, ctx->uri); + c->resp.status, ec->uri); } goto error; } @@ -1068,48 +1118,48 @@ static int cb_es_exit(void *data, struct flb_config *config) /* Configuration properties map */ static struct flb_config_map config_map[] = { { - FLB_CONFIG_MAP_STR, "index", FLB_ES_DEFAULT_INDEX, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, index), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_INDEX, FLB_ES_DEFAULT_INDEX, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, index), "Set an index name" }, { - FLB_CONFIG_MAP_STR, "type", FLB_ES_DEFAULT_TYPE, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, type), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TYPE, FLB_ES_DEFAULT_TYPE, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, type), "Set the document type property" }, { - FLB_CONFIG_MAP_BOOL, "suppress_type_name", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, suppress_type_name), + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_SUPPRESS_TYPE_NAME, "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, suppress_type_name), "If true, mapping types is removed. (for v7.0.0 or later)" }, /* HTTP Authentication */ { - FLB_CONFIG_MAP_STR, "http_user", NULL, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_user), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_HTTP_USER, NULL, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, http_user), "Optional username credential for Elastic X-Pack access" }, { - FLB_CONFIG_MAP_STR, "http_passwd", "", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_passwd), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_HTTP_PASSWD, "", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, http_passwd), "Password for user defined in HTTP_User" }, /* HTTP Compression */ { - FLB_CONFIG_MAP_STR, "compress", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_COMPRESS, NULL, 0, FLB_FALSE, 0, "Set payload compression mechanism. Option available is 'gzip'" }, /* Cloud Authentication */ { - FLB_CONFIG_MAP_STR, "cloud_id", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_CLOUD_ID, NULL, 0, FLB_FALSE, 0, "Elastic cloud ID of the cluster to connect to" }, { - FLB_CONFIG_MAP_STR, "cloud_auth", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_CLOUD_AUTH, NULL, 0, FLB_FALSE, 0, "Elastic cloud authentication credentials" }, @@ -1117,38 +1167,38 @@ static struct flb_config_map config_map[] = { /* AWS Authentication */ #ifdef FLB_HAVE_AWS { - FLB_CONFIG_MAP_BOOL, "aws_auth", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, has_aws_auth), + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_AWS_AUTH, "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, has_aws_auth), "Enable AWS Sigv4 Authentication" }, { - FLB_CONFIG_MAP_STR, "aws_region", NULL, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_region), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_REGION, NULL, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_region), "AWS Region of your Amazon OpenSearch Service cluster" }, { - FLB_CONFIG_MAP_STR, "aws_sts_endpoint", NULL, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_sts_endpoint), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_STS_ENDPOINT, NULL, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_sts_endpoint), "Custom endpoint for the AWS STS API, used with the AWS_Role_ARN option" }, { - FLB_CONFIG_MAP_STR, "aws_role_arn", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN, NULL, 0, FLB_FALSE, 0, "AWS IAM Role to assume to put records to your Amazon OpenSearch cluster" }, { - FLB_CONFIG_MAP_STR, "aws_external_id", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID, NULL, 0, FLB_FALSE, 0, "External ID for the AWS IAM Role specified with `aws_role_arn`" }, { - FLB_CONFIG_MAP_STR, "aws_service_name", "es", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_service_name), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_SERVICE_NAME, "es", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_service_name), "AWS Service Name" }, { - FLB_CONFIG_MAP_STR, "aws_profile", NULL, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_profile), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_PROFILE, NULL, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_profile), "AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in " "$HOME/.aws/ directory." }, @@ -1156,69 +1206,69 @@ static struct flb_config_map config_map[] = { /* Logstash compatibility */ { - FLB_CONFIG_MAP_BOOL, "logstash_format", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_format), + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_LOGSTASH_FORMAT, "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_format), "Enable Logstash format compatibility" }, { - FLB_CONFIG_MAP_STR, "logstash_prefix", FLB_ES_DEFAULT_PREFIX, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX, FLB_ES_DEFAULT_PREFIX, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix), "When Logstash_Format is enabled, the Index name is composed using a prefix " "and the date, e.g: If Logstash_Prefix is equals to 'mydata' your index will " "become 'mydata-YYYY.MM.DD'. The last string appended belongs to the date " "when the data is being generated" }, { - FLB_CONFIG_MAP_STR, "logstash_prefix_separator", "-", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix_separator), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_SEPARATOR, "-", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix_separator), "Set a separator between logstash_prefix and date." }, { - FLB_CONFIG_MAP_STR, "logstash_prefix_key", NULL, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix_key), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_KEY, NULL, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix_key), "When included: the value in the record that belongs to the key will be looked " "up and over-write the Logstash_Prefix for index generation. If the key/value " "is not found in the record then the Logstash_Prefix option will act as a " "fallback. Nested keys are supported through record accessor pattern" }, { - FLB_CONFIG_MAP_STR, "logstash_dateformat", FLB_ES_DEFAULT_TIME_FMT, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_dateformat), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_DATEFORMAT, FLB_ES_DEFAULT_TIME_FMT, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_dateformat), "Time format (based on strftime) to generate the second part of the Index name" }, /* Custom Time and Tag keys */ { - FLB_CONFIG_MAP_STR, "time_key", FLB_ES_DEFAULT_TIME_KEY, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TIME_KEY, FLB_ES_DEFAULT_TIME_KEY, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key), "When Logstash_Format is enabled, each record will get a new timestamp field. " "The Time_Key property defines the name of that field" }, { - FLB_CONFIG_MAP_STR, "time_key_format", FLB_ES_DEFAULT_TIME_KEYF, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_format), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TIME_KEY_FORMAT, FLB_ES_DEFAULT_TIME_KEYF, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key_format), "When Logstash_Format is enabled, this property defines the format of the " "timestamp" }, { - FLB_CONFIG_MAP_BOOL, "time_key_nanos", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_nanos), + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_TIME_KEY_NANOS, "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key_nanos), "When Logstash_Format is enabled, enabling this property sends nanosecond " "precision timestamps" }, { - FLB_CONFIG_MAP_BOOL, "include_tag_key", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, include_tag_key), + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_INCLUDE_TAG_KEY, "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, include_tag_key), "When enabled, it append the Tag name to the record" }, { - FLB_CONFIG_MAP_STR, "tag_key", FLB_ES_DEFAULT_TAG_KEY, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, tag_key), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TAG_KEY, FLB_ES_DEFAULT_TAG_KEY, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, tag_key), "When Include_Tag_Key is enabled, this property defines the key name for the tag" }, { - FLB_CONFIG_MAP_SIZE, "buffer_size", FLB_ES_DEFAULT_HTTP_MAX, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, buffer_size), + FLB_CONFIG_MAP_SIZE, FLB_ES_CONFIG_PROPERTY_BUFFER_SIZE, FLB_ES_DEFAULT_HTTP_MAX, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, buffer_size), "Specify the buffer size used to read the response from the Elasticsearch HTTP " "service. This option is useful for debugging purposes where is required to read " "full responses, note that response size grows depending of the number of records " @@ -1228,7 +1278,7 @@ static struct flb_config_map config_map[] = { /* Elasticsearch specifics */ { - FLB_CONFIG_MAP_STR, "path", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_PATH, NULL, 0, FLB_FALSE, 0, "Elasticsearch accepts new data on HTTP query path '/_bulk'. But it is also " "possible to serve Elasticsearch behind a reverse proxy on a subpath. This " @@ -1236,7 +1286,7 @@ static struct flb_config_map config_map[] = { "prefix in the indexing HTTP POST URI" }, { - FLB_CONFIG_MAP_STR, "pipeline", NULL, + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_PIPELINE, NULL, 0, FLB_FALSE, 0, "Newer versions of Elasticsearch allows to setup filters called pipelines. " "This option allows to define which pipeline the database should use. For " @@ -1244,46 +1294,52 @@ static struct flb_config_map config_map[] = { "Fluent Bit side, avoid pipelines" }, { - FLB_CONFIG_MAP_BOOL, "generate_id", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, generate_id), + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_GENERATE_ID, "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, generate_id), "When enabled, generate _id for outgoing records. This prevents duplicate " "records when retrying ES" }, { - FLB_CONFIG_MAP_STR, "write_operation", "create", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, write_operation), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_WRITE_OPERATION, "create", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, write_operation), "Operation to use to write in bulk requests" }, { - FLB_CONFIG_MAP_STR, "id_key", NULL, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, id_key), + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_ID_KEY, NULL, + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, id_key), "If set, _id will be the value of the key from incoming record." }, { - FLB_CONFIG_MAP_BOOL, "replace_dots", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots), + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_REPLACE_DOTS, "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, replace_dots), "When enabled, replace field name dots with underscore, required by Elasticsearch " "2.0-2.3." }, { - FLB_CONFIG_MAP_BOOL, "current_time_index", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, current_time_index), + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_CURRENT_TIME_INDEX, "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, current_time_index), "Use current time for index generation instead of message record" }, /* Trace */ { - FLB_CONFIG_MAP_BOOL, "trace_output", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_output), + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_TRACE_OUTPUT, "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, trace_output), "When enabled print the Elasticsearch API calls to stdout (for diag only)" }, { - FLB_CONFIG_MAP_BOOL, "trace_error", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_error), + FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_TRACE_ERROR, "false", + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, trace_error), "When enabled print the Elasticsearch exception to stderr (for diag only)" }, + { + FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_UPSTREAM, NULL, + 0, FLB_FALSE, 0, + "Path to 'upstream' configuration file (define multiple nodes)" + }, + /* EOF */ {0} }; diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index b6512ebc2a0..e4e8c46d59f 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -20,8 +20,12 @@ #ifndef FLB_OUT_ES_H #define FLB_OUT_ES_H +#include +#include +#include + #define FLB_ES_DEFAULT_HOST "127.0.0.1" -#define FLB_ES_DEFAULT_PORT 92000 +#define FLB_ES_DEFAULT_PORT 9200 #define FLB_ES_DEFAULT_INDEX "fluent-bit" #define FLB_ES_DEFAULT_TYPE "_doc" #define FLB_ES_DEFAULT_PREFIX "logstash" @@ -31,10 +35,6 @@ #define FLB_ES_DEFAULT_TAG_KEY "flb-key" #define FLB_ES_DEFAULT_HTTP_MAX "512k" #define FLB_ES_DEFAULT_HTTPS_PORT 443 -#define FLB_ES_WRITE_OP_INDEX "index" -#define FLB_ES_WRITE_OP_CREATE "create" -#define FLB_ES_WRITE_OP_UPDATE "update" -#define FLB_ES_WRITE_OP_UPSERT "upsert" #define FLB_ES_STATUS_SUCCESS (1 << 0) #define FLB_ES_STATUS_IMCOMPLETE (1 << 1) @@ -45,10 +45,12 @@ #define FLB_ES_STATUS_DUPLICATES (1 << 6) #define FLB_ES_STATUS_ERROR (1 << 7) -struct flb_elasticsearch { +struct flb_elasticsearch_config { /* Elasticsearch index (database) and type (table) */ char *index; + int own_index; char *type; + int own_type; int suppress_type_name; /* HTTP Auth */ @@ -57,7 +59,9 @@ struct flb_elasticsearch { /* Elastic Cloud Auth */ char *cloud_user; + int own_cloud_user; char *cloud_passwd; + int own_cloud_passwd; /* AWS Auth */ #ifdef FLB_HAVE_AWS @@ -66,14 +70,17 @@ struct flb_elasticsearch { char *aws_sts_endpoint; char *aws_profile; struct flb_aws_provider *aws_provider; + int own_aws_provider; struct flb_aws_provider *base_aws_provider; + int own_base_aws_provider; /* tls instances can't be re-used; aws provider requires a separate one */ struct flb_tls *aws_tls; - /* one for the standard chain provider, one for sts assume role */ + int own_aws_tls; struct flb_tls *aws_sts_tls; - char *aws_session_name; + int own_aws_sts_tls; char *aws_service_name; struct mk_list *aws_unsigned_headers; + int own_aws_unsigned_headers; #endif /* HTTP Client Setup */ @@ -100,50 +107,74 @@ struct flb_elasticsearch { /* prefix */ flb_sds_t logstash_prefix; + int own_logstash_prefix; flb_sds_t logstash_prefix_separator; + int own_logstash_prefix_separator; /* prefix key */ flb_sds_t logstash_prefix_key; + int own_logstash_prefix_key; /* date format */ flb_sds_t logstash_dateformat; + int own_logstash_dateformat; /* time key */ flb_sds_t time_key; + int own_time_key; /* time key format */ flb_sds_t time_key_format; + int own_time_key_format; /* time key nanoseconds */ int time_key_nanos; - /* write operation */ flb_sds_t write_operation; + int own_write_operation; /* write operation elasticsearch operation */ - flb_sds_t es_action; + const char *es_action; /* id_key */ flb_sds_t id_key; + int own_id_key; struct flb_record_accessor *ra_id_key; + int own_ra_id_key; /* include_tag_key */ int include_tag_key; flb_sds_t tag_key; + int own_tag_key; /* Elasticsearch HTTP API */ char uri[256]; struct flb_record_accessor *ra_prefix_key; + int own_ra_prefix_key; /* Compression mode (gzip) */ int compress_gzip; - /* Upstream connection to the backend server */ + /* List entry data for flb_elasticsearch->configs list */ + struct mk_list _head; +}; + +struct flb_elasticsearch { + /* if HA mode is enabled */ + int ha_mode; /* High Availability mode enabled ? */ + char *ha_upstream; /* Upstream configuration file */ + struct flb_upstream_ha *ha; + + /* Upstream handler and config context for single mode (no HA) */ struct flb_upstream *u; + struct mk_list configs; /* Plugin output instance reference */ struct flb_output_instance *ins; }; +struct flb_elasticsearch_config *flb_elasticsearch_target( + struct flb_elasticsearch *ctx, struct flb_upstream_node **node); + #endif diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index 4bc2977c5eb..293df5e1e9a 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -17,521 +17,899 @@ * limitations under the License. */ +#include +#include #include #include #include #include #include #include -#include -#include #include "es.h" +#include "es_conf_parse.h" +#include "es_conf_prop.h" #include "es_conf.h" -/* - * extract_cloud_host extracts the public hostname - * of a deployment from a Cloud ID string. - * - * The Cloud ID string has the format ":". - * Once decoded, the "base64_info" string has the format "$$" - * and the function returns "." token. - */ -static flb_sds_t extract_cloud_host(struct flb_elasticsearch *ctx, - const char *cloud_id) +static const char * const es_default_path = ""; +static const char * const es_write_op_index = FLB_ES_WRITE_OP_INDEX; +static const char * const es_write_op_create = FLB_ES_WRITE_OP_CREATE; +static const char * const es_write_op_update = FLB_ES_WRITE_OP_UPDATE; +static const char * const es_write_op_upsert = FLB_ES_WRITE_OP_UPSERT; + +static int config_set_ra_id_key(flb_sds_t id_key, struct flb_elasticsearch_config *ec, + struct flb_elasticsearch *ctx) { + if (!id_key) { + return 0; + } - char *colon; - char *region; - char *host; - char *port = NULL; - char buf[256] = {0}; - char cloud_host_buf[256] = {0}; - const char dollar[2] = "$"; - size_t len; - int ret; + ec->ra_id_key = flb_ra_create(id_key, FLB_FALSE); + if (ec->ra_id_key == NULL) { + flb_plg_error(ctx->ins, "could not create record accessor for Id Key"); + return -1; + } + ec->own_ra_id_key = FLB_TRUE; - /* keep only part after first ":" */ - colon = strchr(cloud_id, ':'); - if (colon == NULL) { - return NULL; + if (ec->generate_id == FLB_TRUE) { + flb_plg_warn(ctx->ins, "Generate_ID is ignored when ID_key is set"); + ec->generate_id = FLB_FALSE; } - colon++; - /* decode base64 */ - ret = flb_base64_decode((unsigned char *)buf, sizeof(buf), &len, (unsigned char *)colon, strlen(colon)); - if (ret) { - flb_plg_error(ctx->ins, "cannot decode cloud_id"); - return NULL; + return 0; +} + +static int config_set_es_action(const char *write_operation, + const struct flb_record_accessor *ra_id_key, + int generate_id, + struct flb_elasticsearch_config *ec, + struct flb_elasticsearch *ctx) +{ + if (!write_operation) { + return 0; } - region = strtok(buf, dollar); - if (region == NULL) { - return NULL; + + if (strcasecmp(write_operation, es_write_op_index) == 0) { + ec->es_action = es_write_op_index; } - host = strtok(NULL, dollar); - if (host == NULL) { - return NULL; + else if (strcasecmp(write_operation, es_write_op_create) == 0) { + ec->es_action = es_write_op_create; + } + else if (strcasecmp(write_operation, es_write_op_update) == 0 + || strcasecmp(write_operation, es_write_op_upsert) == 0) { + ec->es_action = es_write_op_update; + } + else { + flb_plg_error(ctx->ins, + "wrong Write_Operation (should be one of index, create, update, upsert)"); + return -1; } - /* - * Some cloud id format is "$:$" . - * e.g. https://github.com/elastic/beats/blob/v8.4.1/libbeat/cloudid/cloudid_test.go#L60 - * - * It means the variable "host" can contains ':' and port number. - */ - colon = strchr(host, ':'); - if (colon != NULL) { - /* host contains host number */ - *colon = '\0'; /* remove port number from host */ - port = colon+1; + if (strcasecmp(ec->es_action, es_write_op_update) == 0 + && !ra_id_key + && generate_id == FLB_FALSE) { + flb_plg_error(ctx->ins, + "Id_Key or Generate_Id must be set when Write_Operation update or upsert"); + return -1; } - strcpy(cloud_host_buf, host); - strcat(cloud_host_buf, "."); - strcat(cloud_host_buf, region); - if (port != NULL) { - strcat(cloud_host_buf, ":"); - strcat(cloud_host_buf, port); + return 0; +} + +static size_t config_adjust_buffer_size(size_t buffer_size) +{ + /* HTTP Payload (response) maximum buffer size (0 == unlimited) */ + if (buffer_size == -1) { + return 0; } - return flb_sds_create(cloud_host_buf); + return buffer_size; } -/* - * set_cloud_credentials gets a cloud_auth - * and sets the context's cloud_user and cloud_passwd. - * Example: - * cloud_auth = elastic:ZXVyb3BxxxxxxZTA1Ng - * ----> - * cloud_user = elastic - * cloud_passwd = ZXVyb3BxxxxxxZTA1Ng - */ -static void set_cloud_credentials(struct flb_elasticsearch *ctx, - const char *cloud_auth) +static int config_is_compressed_gzip(const char *compress) { - /* extract strings */ - int items = 0; - struct mk_list *toks; - struct mk_list *head; - struct flb_split_entry *entry; - toks = flb_utils_split((const char *)cloud_auth, ':', -1); - mk_list_foreach(head, toks) { - items++; - entry = mk_list_entry(head, struct flb_split_entry, _head); - if (items == 1) { - ctx->cloud_user = flb_strdup(entry->value); - } - if (items == 2) { - ctx->cloud_passwd = flb_strdup(entry->value); - } + if (strcasecmp(compress, "gzip") == 0) { + return FLB_TRUE; } - flb_utils_split_free(toks); + return FLB_FALSE; } -struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, - struct flb_config *config) +static int config_set_pipeline(const char *path, const char *pipeline, + struct flb_elasticsearch_config *ec) { - int len; - int io_flags = 0; - ssize_t ret; - char *buf; - const char *tmp; - const char *path; -#ifdef FLB_HAVE_AWS - char *aws_role_arn = NULL; - char *aws_external_id = NULL; - char *aws_session_name = NULL; -#endif - char *cloud_port_char; - char *cloud_host = NULL; - int cloud_host_port = 0; - int cloud_port = FLB_ES_DEFAULT_HTTPS_PORT; - struct flb_uri *uri = ins->host.uri; - struct flb_uri_field *f_index = NULL; - struct flb_uri_field *f_type = NULL; - struct flb_upstream *upstream; - struct flb_elasticsearch *ctx; + int ret; - /* Allocate context */ - ctx = flb_calloc(1, sizeof(struct flb_elasticsearch)); - if (!ctx) { - flb_errno(); - return NULL; + if (!path) { + path = es_default_path; } - ctx->ins = ins; - if (uri) { - if (uri->count >= 2) { - f_index = flb_uri_get(uri, 0); - f_type = flb_uri_get(uri, 1); - } + if (pipeline && flb_str_emptyval(pipeline) != FLB_TRUE) { + ret = snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk/?pipeline=%s", path, + pipeline); + } + else { + ret = snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk", path); } - /* handle cloud_id */ - tmp = flb_output_get_property("cloud_id", ins); - if (tmp) { - cloud_host = extract_cloud_host(ctx, tmp); - if (cloud_host == NULL) { - flb_plg_error(ctx->ins, "cannot extract cloud_host"); - flb_es_conf_destroy(ctx); - return NULL; - } - flb_plg_debug(ctx->ins, "extracted cloud_host: '%s'", cloud_host); + if (ret < 0 || ret >= sizeof(ec->uri)) { + return -1; + } + return 0; +} - cloud_port_char = strchr(cloud_host, ':'); +static int config_set_ra_prefix_key(flb_sds_t logstash_prefix_key, + struct flb_elasticsearch_config *ec, + struct flb_elasticsearch *ctx) +{ + size_t len; + char *buf; - if (cloud_port_char == NULL) { - flb_plg_debug(ctx->ins, "cloud_host: '%s' does not contain a port: '%s'", cloud_host, cloud_host); - } - else { - cloud_port_char[0] = '\0'; - cloud_port_char = &cloud_port_char[1]; - flb_plg_debug(ctx->ins, "extracted cloud_port_char: '%s'", cloud_port_char); - cloud_host_port = (int) strtol(cloud_port_char, (char **) NULL, 10); - flb_plg_debug(ctx->ins, "converted cloud_port_char to port int: '%i'", cloud_host_port); - } - - if (cloud_host_port == 0) { - cloud_host_port = cloud_port; + if (!logstash_prefix_key) { + return 0; + } + + if (logstash_prefix_key[0] != '$') { + len = flb_sds_len(logstash_prefix_key); + buf = flb_malloc(len + 2); + if (!buf) { + flb_errno(); + return -1; } + buf[0] = '$'; + memcpy(buf + 1, logstash_prefix_key, len); + buf[len + 1] = '\0'; + + ec->ra_prefix_key = flb_ra_create(buf, FLB_TRUE); + ec->own_ra_prefix_key = FLB_TRUE; + flb_free(buf); + } + else { + ec->ra_prefix_key = flb_ra_create(logstash_prefix_key, FLB_TRUE); + ec->own_ra_prefix_key = FLB_TRUE; + } - flb_plg_debug(ctx->ins, - "checked whether extracted port was null and set it to " - "default https port or not. Outcome: '%i' and cloud_host: '%s'.", - cloud_host_port, cloud_host); + if (!ec->ra_prefix_key) { + flb_plg_error(ctx->ins, "invalid logstash_prefix_key pattern '%s'", + logstash_prefix_key); + return -1; + } - if (ins->host.name != NULL) { - flb_sds_destroy(ins->host.name); - } + return 0; +} - ins->host.name = cloud_host; - ins->host.port = cloud_host_port; +static int config_set_properties(struct flb_elasticsearch_config *ec, + struct flb_elasticsearch *ctx, + struct flb_config *config) +{ + int ret; + const char *tmp; + struct flb_uri *uri = ctx->ins->host.uri; + struct flb_uri_field *f_index = NULL; + struct flb_uri_field *f_type = NULL; + + if (uri) { + if (uri->count >= 2) { + f_index = flb_uri_get(uri, 0); + f_type = flb_uri_get(uri, 1); + } } - /* Set default network configuration */ - flb_output_net_default("127.0.0.1", 9200, ins); + /* handle cloud_id */ + ret = flb_es_conf_set_cloud_auth( + flb_output_get_property(FLB_ES_CONFIG_PROPERTY_CLOUD_ID, ctx->ins), ctx); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot configure cloud_id"); + return -1; + } /* Populate context with config map defaults and incoming properties */ - ret = flb_output_config_map_set(ins, (void *) ctx); - if (ret == -1) { + ret = flb_output_config_map_set(ctx->ins, ec); + if (ret != 0) { flb_plg_error(ctx->ins, "configuration error"); - flb_es_conf_destroy(ctx); - return NULL; + return -1; } + ec->buffer_size = config_adjust_buffer_size(ec->buffer_size); + /* handle cloud_auth */ - tmp = flb_output_get_property("cloud_auth", ins); + ret = flb_es_conf_set_cloud_credentials( + flb_output_get_property(FLB_ES_CONFIG_PROPERTY_CLOUD_AUTH, ctx->ins), ec); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot configure cloud_auth"); + return -1; + } + + /* Compress (gzip) */ + tmp = flb_output_get_property(FLB_ES_CONFIG_PROPERTY_COMPRESS, ctx->ins); + ec->compress_gzip = FLB_FALSE; if (tmp) { - set_cloud_credentials(ctx, tmp); + ec->compress_gzip = config_is_compressed_gzip(tmp); } - /* use TLS ? */ - if (ins->use_tls == FLB_TRUE) { - io_flags = FLB_IO_TLS; + /* Set manual Index and Type */ + if (f_index) { + ec->index = flb_strdup(f_index->value); + ec->own_index = FLB_TRUE; } - else { - io_flags = FLB_IO_TCP; + + if (f_type) { + ec->type = flb_strdup(f_type->value); + ec->own_type = FLB_TRUE; } - if (ins->host.ipv6 == FLB_TRUE) { - io_flags |= FLB_IO_IPV6; + /* Elasticsearch: path and pipeline */ + ret = config_set_pipeline( + flb_output_get_property(FLB_ES_CONFIG_PROPERTY_PATH, ctx->ins), + flb_output_get_property(FLB_ES_CONFIG_PROPERTY_PIPELINE, ctx->ins), + ec); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot configure path and/or pipeline"); + return -1; } - /* Compress (gzip) */ - tmp = flb_output_get_property("compress", ins); - ctx->compress_gzip = FLB_FALSE; - if (tmp) { - if (strcasecmp(tmp, "gzip") == 0) { - ctx->compress_gzip = FLB_TRUE; - } + ret = config_set_ra_id_key(ec->id_key, ec, ctx); + if (ret != 0) { + return -1; } - /* Prepare an upstream handler */ - upstream = flb_upstream_create(config, - ins->host.name, - ins->host.port, - io_flags, - ins->tls); - if (!upstream) { - flb_plg_error(ctx->ins, "cannot create Upstream context"); - flb_es_conf_destroy(ctx); - return NULL; + ret = config_set_es_action(ec->write_operation, ec->ra_id_key, ec->generate_id, ec, + ctx); + if (ret != 0) { + return -1; } - ctx->u = upstream; - /* Set instance flags into upstream */ - flb_output_upstream_set(ctx->u, ins); + ret = config_set_ra_prefix_key(ec->logstash_prefix_key, ec, ctx); + if (ret != 0) { + return -1; + } - /* Set manual Index and Type */ - if (f_index) { - ctx->index = flb_strdup(f_index->value); /* FIXME */ +#ifdef FLB_HAVE_AWS + ret = flb_es_set_aws_unsigned_headers(ec); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot configure AWS unsigned headers"); + return -1; } - if (f_type) { - ctx->type = flb_strdup(f_type->value); /* FIXME */ + ret = flb_es_conf_set_aws_provider( + flb_output_get_property(FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID, ctx->ins), + flb_output_get_property(FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN, ctx->ins), + ec, ctx, config); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot configure AWS authentication"); + return -1; } +#endif - /* HTTP Payload (response) maximum buffer size (0 == unlimited) */ - if (ctx->buffer_size == -1) { - ctx->buffer_size = 0; + return 0; +} + +static int config_set_node_properties(struct flb_upstream_node *node, + struct flb_elasticsearch_config *ec, + struct flb_elasticsearch_config *base, + struct flb_elasticsearch *ctx, + struct flb_config *config) +{ + const char *tmp; + int ret; + const char *path; + +#ifdef FLB_HAVE_AWS + const char *aws_external_id = NULL; + const char *aws_role_arn = NULL; + int aws_provider_node = FLB_FALSE; +#endif + + /* Copy base configuration */ + *ec = *base; + ec->own_index = FLB_FALSE; + ec->own_type = FLB_FALSE; + ec->own_cloud_user = FLB_FALSE; + ec->own_cloud_passwd = FLB_FALSE; + +#ifdef FLB_HAVE_AWS + ec->own_base_aws_provider = FLB_FALSE; + ec->own_aws_provider = FLB_FALSE; + ec->own_aws_tls = FLB_FALSE; + ec->own_aws_sts_tls = FLB_FALSE; + ec->own_aws_unsigned_headers = FLB_FALSE; +#endif + + ec->own_logstash_prefix = FLB_FALSE; + ec->own_logstash_prefix_separator = FLB_FALSE; + ec->own_logstash_prefix_key = FLB_FALSE; + ec->own_logstash_dateformat = FLB_FALSE; + ec->own_time_key = FLB_FALSE; + ec->own_time_key_format = FLB_FALSE; + ec->own_write_operation = FLB_FALSE; + ec->own_id_key = FLB_FALSE; + ec->own_ra_id_key = FLB_FALSE; + ec->own_ra_prefix_key = FLB_FALSE; + ec->own_tag_key = FLB_FALSE; + mk_list_entry_init(&ec->_head); + + /* Overwrite configuration from upstream node properties */ + + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_INDEX, node); + if (tmp) { + ec->index = (char *)tmp; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TYPE, node); + if (tmp) { + ec->type = (char *)tmp; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_SUPPRESS_TYPE_NAME, node); + if (tmp) { + ec->suppress_type_name = flb_utils_bool(tmp); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_HTTP_USER, node); + if (tmp) { + ec->http_user = (char *)tmp; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_HTTP_PASSWD, node); + if (tmp) { + ec->http_passwd = (char *)tmp; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_GENERATE_ID, node); + if (tmp) { + ec->generate_id = flb_utils_bool(tmp); } - /* Elasticsearch: Path */ - path = flb_output_get_property("path", ins); - if (!path) { - path = ""; +#ifdef FLB_HAVE_AWS + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_AUTH, node); + if (tmp) { + ec->has_aws_auth = flb_utils_bool(tmp); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_REGION, node); + if (tmp) { + ec->aws_region = (char *)tmp; + aws_provider_node = FLB_TRUE; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_STS_ENDPOINT, node); + if (tmp) { + ec->aws_sts_endpoint = (char *)tmp; + aws_provider_node = FLB_TRUE; } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_SERVICE_NAME, node); + if (tmp) { + ec->aws_service_name = (char *)tmp; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_PROFILE, node); + if (tmp) { + ec->aws_profile = (char *)tmp; + aws_provider_node = FLB_TRUE; + } + if (ec->has_aws_auth) { + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID, + node); + if (tmp) { + aws_external_id = tmp; + aws_provider_node = FLB_TRUE; + } + else { + aws_external_id = flb_output_get_property( + FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID, ctx->ins); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN, node); + if (tmp) { + aws_role_arn = tmp; + aws_provider_node = FLB_TRUE; + } + else { + aws_role_arn = flb_output_get_property(FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN, + ctx->ins); + } + } +#endif - /* Elasticsearch: Pipeline */ - tmp = flb_output_get_property("pipeline", ins); + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_LOGSTASH_FORMAT, node); if (tmp) { - snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); + ec->logstash_format = flb_utils_bool(tmp); } - else { - snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path); + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX, node); + if (tmp) { + ec->logstash_prefix = flb_sds_create(tmp); + if (ec->logstash_prefix == NULL) { + return -1; + } + ec->own_logstash_prefix = FLB_TRUE; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_SEPARATOR, + node); + if (tmp) { + ec->logstash_prefix_separator = flb_sds_create(tmp); + if (ec->logstash_prefix_separator == NULL) { + return -1; + } + ec->own_logstash_prefix_separator = FLB_TRUE; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_LOGSTASH_DATEFORMAT, + node); + if (tmp) { + ec->logstash_dateformat = flb_sds_create(tmp); + if (ec->logstash_dateformat == NULL) { + return -1; + } + ec->own_logstash_dateformat = FLB_TRUE; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TIME_KEY, node); + if (tmp) { + ec->time_key = flb_sds_create(tmp); + if (ec->time_key == NULL) { + return -1; + } + ec->own_time_key = FLB_TRUE; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TIME_KEY_FORMAT, node); + if (tmp) { + ec->time_key_format = flb_sds_create(tmp); + if (ec->time_key_format == NULL) { + return -1; + } + ec->own_time_key_format = FLB_TRUE; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TIME_KEY_NANOS, node); + if (tmp) { + ec->time_key_nanos = flb_utils_bool(tmp); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_INCLUDE_TAG_KEY, node); + if (tmp) { + ec->include_tag_key = flb_utils_bool(tmp); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TAG_KEY, node); + if (tmp) { + ec->tag_key = flb_sds_create(tmp); + if (ec->tag_key == NULL) { + return -1; + } + ec->own_tag_key = FLB_TRUE; + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_BUFFER_SIZE, node); + if (tmp) { + ec->buffer_size = config_adjust_buffer_size(flb_utils_size_to_bytes(tmp)); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_REPLACE_DOTS, node); + if (tmp) { + ec->replace_dots = flb_utils_bool(tmp); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_CURRENT_TIME_INDEX, node); + if (tmp) { + ec->current_time_index = flb_utils_bool(tmp); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TRACE_OUTPUT, node); + if (tmp) { + ec->trace_output = flb_utils_bool(tmp); + } + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_TRACE_ERROR, node); + if (tmp) { + ec->trace_error = flb_utils_bool(tmp); } - if (ctx->id_key) { - ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE); - if (ctx->ra_id_key == NULL) { - flb_plg_error(ins, "could not create record accessor for Id Key"); + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_KEY, + node); + if (tmp) { + ec->logstash_prefix_key = flb_sds_create(tmp); + if (ec->logstash_prefix_key == NULL) { + return -1; } - if (ctx->generate_id == FLB_TRUE) { - flb_plg_warn(ins, "Generate_ID is ignored when ID_key is set"); - ctx->generate_id = FLB_FALSE; + ec->own_logstash_prefix_key = FLB_TRUE; + ret = config_set_ra_prefix_key(ec->logstash_prefix_key, ec, ctx); + if (ret != 0) { + return -1; + } + } + + /* handle cloud_auth */ + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_CLOUD_AUTH, node); + if (tmp) { + ret = flb_es_conf_set_cloud_credentials(tmp, ec); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot configure cloud_auth"); + return -1; } } - if (ctx->write_operation) { - if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_INDEX) == 0) { - ctx->es_action = flb_strdup(FLB_ES_WRITE_OP_INDEX); + /* Compress (gzip) */ + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_COMPRESS, node); + if (tmp) { + ec->compress_gzip = config_is_compressed_gzip(tmp); + } + + /* Elasticsearch: path and pipeline */ + path = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_PATH, node); + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_PIPELINE, node); + if (path || tmp) { + if (!path) { + path = flb_output_get_property(FLB_ES_CONFIG_PROPERTY_PATH, ctx->ins); } - else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_CREATE) == 0) { - ctx->es_action = flb_strdup(FLB_ES_WRITE_OP_CREATE); + if (!tmp) { + tmp = flb_output_get_property(FLB_ES_CONFIG_PROPERTY_PIPELINE, ctx->ins); } - else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0 - || strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) { - ctx->es_action = flb_strdup(FLB_ES_WRITE_OP_UPDATE); + ret = config_set_pipeline(path, tmp, ec); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot configure path and/or pipeline"); + return -1; } - else { - flb_plg_error(ins, "wrong Write_Operation (should be one of index, create, update, upsert)"); - flb_es_conf_destroy(ctx); - return NULL; + } + + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_ID_KEY, node); + if (tmp) { + ec->id_key = flb_sds_create(tmp); + if (ec->id_key == NULL) { + return -1; } - if (strcasecmp(ctx->es_action, FLB_ES_WRITE_OP_UPDATE) == 0 - && !ctx->ra_id_key && ctx->generate_id == FLB_FALSE) { - flb_plg_error(ins, "Id_Key or Generate_Id must be set when Write_Operation update or upsert"); - flb_es_conf_destroy(ctx); - return NULL; + ec->own_id_key = FLB_TRUE; + ret = config_set_ra_id_key(ec->id_key, ec, ctx); + if (ret != 0) { + return -1; } } - if (ctx->logstash_prefix_key) { - if (ctx->logstash_prefix_key[0] != '$') { - len = flb_sds_len(ctx->logstash_prefix_key); - buf = flb_malloc(len + 2); - if (!buf) { - flb_errno(); - flb_es_conf_destroy(ctx); - return NULL; - } - buf[0] = '$'; - memcpy(buf + 1, ctx->logstash_prefix_key, len); - buf[len + 1] = '\0'; - - ctx->ra_prefix_key = flb_ra_create(buf, FLB_TRUE); - flb_free(buf); + tmp = flb_upstream_node_get_property(FLB_ES_CONFIG_PROPERTY_WRITE_OPERATION, node); + if (tmp) { + ec->write_operation = flb_sds_create(tmp); + if (ec->write_operation == NULL) { + return -1; } - else { - ctx->ra_prefix_key = flb_ra_create(ctx->logstash_prefix_key, FLB_TRUE); + ec->own_write_operation = FLB_TRUE; + ret = config_set_es_action(ec->write_operation, ec->ra_id_key, ec->generate_id, + ec, ctx); + + if (ret != 0) { + return -1; } + } - if (!ctx->ra_prefix_key) { - flb_plg_error(ins, "invalid logstash_prefix_key pattern '%s'", tmp); - flb_es_conf_destroy(ctx); - return NULL; +#ifdef FLB_HAVE_AWS + if ((base->has_aws_auth != ec->has_aws_auth) + || (base->has_aws_auth == FLB_TRUE + && ec->has_aws_auth == FLB_TRUE + && aws_provider_node == FLB_TRUE)) { + ret = flb_es_conf_set_aws_provider(aws_external_id, aws_role_arn, ec, ctx, + config); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot configure AWS authentication"); + return -1; } } +#endif + + return 0; +} + +static void elasticsearch_config_destroy(struct flb_elasticsearch_config *ec) +{ + if (ec->own_tag_key == FLB_TRUE) { + flb_sds_destroy(ec->tag_key); + } + + if (ec->ra_id_key && ec->own_ra_id_key == FLB_TRUE) { + flb_ra_destroy(ec->ra_id_key); + ec->ra_id_key = NULL; + } + + if (ec->own_write_operation == FLB_TRUE) { + flb_sds_destroy(ec->write_operation); + } + + if (ec->own_id_key == FLB_TRUE) { + flb_sds_destroy(ec->id_key); + } + + if (ec->own_time_key_format == FLB_TRUE) { + flb_sds_destroy(ec->time_key_format); + } + + if (ec->own_time_key == FLB_TRUE) { + flb_sds_destroy(ec->time_key); + } + + if (ec->own_logstash_dateformat == FLB_TRUE) { + flb_sds_destroy(ec->logstash_dateformat); + } + + if (ec->own_logstash_prefix_key == FLB_TRUE) { + flb_sds_destroy(ec->logstash_prefix_key); + } + + if (ec->own_logstash_prefix_separator == FLB_TRUE) { + flb_sds_destroy(ec->logstash_prefix_separator); + } + + if (ec->own_logstash_prefix == FLB_TRUE) { + flb_sds_destroy(ec->logstash_prefix); + } #ifdef FLB_HAVE_AWS - /* AWS Auth Unsigned Headers */ - ctx->aws_unsigned_headers = flb_malloc(sizeof(struct mk_list)); + if (ec->base_aws_provider && ec->own_base_aws_provider == FLB_TRUE) { + flb_aws_provider_destroy(ec->base_aws_provider); + } + + if (ec->aws_provider && ec->own_aws_provider == FLB_TRUE) { + flb_aws_provider_destroy(ec->aws_provider); + } + + if (ec->aws_tls && ec->own_aws_tls == FLB_TRUE) { + flb_tls_destroy(ec->aws_tls); + } + + if (ec->aws_sts_tls && ec->own_aws_sts_tls == FLB_TRUE) { + flb_tls_destroy(ec->aws_sts_tls); + } + + if (ec->aws_unsigned_headers && ec->own_aws_unsigned_headers == FLB_TRUE) { + flb_slist_destroy(ec->aws_unsigned_headers); + flb_free(ec->aws_unsigned_headers); + } +#endif + + if (ec->ra_prefix_key && ec->own_ra_prefix_key == FLB_TRUE) { + flb_ra_destroy(ec->ra_prefix_key); + } + + if (ec->own_cloud_passwd == FLB_TRUE) { + flb_free(ec->cloud_passwd); + } + if (ec->own_cloud_user == FLB_TRUE) { + flb_free(ec->cloud_user); + } + + if (ec->own_type == FLB_TRUE) { + flb_free(ec->type); + } + + if (ec->own_index == FLB_TRUE) { + flb_free(ec->index); + } + + flb_free(ec); +} + +int es_config_ha(const char *upstream_file, struct flb_elasticsearch *ctx, + struct flb_config *config) +{ + int ret; + struct mk_list *head; + struct mk_list *tmp; + struct flb_upstream_node *node; + struct flb_elasticsearch_config *ec; + struct flb_elasticsearch_config *node_ec; + + /* Create main elasticsearch_config context */ + ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); + if (!ec) { + flb_errno(); + flb_plg_error(ctx->ins, "failed config allocation"); + return -1; + } + + /* Read properties into main elasticsearch_config context */ + ret = config_set_properties(ec, ctx, config); if (ret != 0) { - flb_es_conf_destroy(ctx); + elasticsearch_config_destroy(ec); + return -1; } - flb_slist_create(ctx->aws_unsigned_headers); - ret = flb_slist_add(ctx->aws_unsigned_headers, "Content-Length"); + + /* Create upstream nodes */ + ctx->ha_mode = FLB_TRUE; + ctx->ha = flb_upstream_ha_from_file(upstream_file, config); + if (!ctx->ha) { + flb_plg_error(ctx->ins, "cannot load Upstream file"); + elasticsearch_config_destroy(ec); + return -1; + } + + ret = flb_output_upstream_ha_set(ctx->ha, ctx->ins); if (ret != 0) { - flb_es_conf_destroy(ctx); - return NULL; + flb_upstream_ha_destroy(ctx->ha); + elasticsearch_config_destroy(ec); + return -1; } - /* AWS Auth */ - ctx->has_aws_auth = FLB_FALSE; - tmp = flb_output_get_property("aws_auth", ins); - if (tmp) { - if (strncasecmp(tmp, "On", 2) == 0) { - ctx->has_aws_auth = FLB_TRUE; - flb_debug("[out_es] Enabled AWS Auth"); - - /* AWS provider needs a separate TLS instance */ - ctx->aws_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, - FLB_TRUE, - ins->tls_debug, - ins->tls_vhost, - ins->tls_ca_path, - ins->tls_ca_file, - ins->tls_crt_file, - ins->tls_key_file, - ins->tls_key_passwd); - if (!ctx->aws_tls) { - flb_errno(); - flb_es_conf_destroy(ctx); - return NULL; - } - - tmp = flb_output_get_property("aws_region", ins); - if (!tmp) { - flb_error("[out_es] aws_auth enabled but aws_region not set"); - flb_es_conf_destroy(ctx); - return NULL; - } - ctx->aws_region = (char *) tmp; - - tmp = flb_output_get_property("aws_sts_endpoint", ins); - if (tmp) { - ctx->aws_sts_endpoint = (char *) tmp; - } - - ctx->aws_provider = flb_standard_chain_provider_create(config, - ctx->aws_tls, - ctx->aws_region, - ctx->aws_sts_endpoint, - NULL, - flb_aws_client_generator(), - ctx->aws_profile); - if (!ctx->aws_provider) { - flb_error("[out_es] Failed to create AWS Credential Provider"); - flb_es_conf_destroy(ctx); - return NULL; - } - - tmp = flb_output_get_property("aws_role_arn", ins); - if (tmp) { - /* Use the STS Provider */ - ctx->base_aws_provider = ctx->aws_provider; - aws_role_arn = (char *) tmp; - aws_external_id = NULL; - tmp = flb_output_get_property("aws_external_id", ins); - if (tmp) { - aws_external_id = (char *) tmp; - } - - aws_session_name = flb_sts_session_name(); - if (!aws_session_name) { - flb_error("[out_es] Failed to create aws iam role " - "session name"); - flb_es_conf_destroy(ctx); - return NULL; - } - - /* STS provider needs yet another separate TLS instance */ - ctx->aws_sts_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, - FLB_TRUE, - ins->tls_debug, - ins->tls_vhost, - ins->tls_ca_path, - ins->tls_ca_file, - ins->tls_crt_file, - ins->tls_key_file, - ins->tls_key_passwd); - if (!ctx->aws_sts_tls) { - flb_errno(); - flb_es_conf_destroy(ctx); - return NULL; - } - - ctx->aws_provider = flb_sts_provider_create(config, - ctx->aws_sts_tls, - ctx-> - base_aws_provider, - aws_external_id, - aws_role_arn, - aws_session_name, - ctx->aws_region, - ctx->aws_sts_endpoint, - NULL, - flb_aws_client_generator()); - /* Session name can be freed once provider is created */ - flb_free(aws_session_name); - if (!ctx->aws_provider) { - flb_error("[out_es] Failed to create AWS STS Credential " - "Provider"); - flb_es_conf_destroy(ctx); - return NULL; - } - - } - - /* initialize credentials in sync mode */ - ctx->aws_provider->provider_vtable->sync(ctx->aws_provider); - ctx->aws_provider->provider_vtable->init(ctx->aws_provider); - /* set back to async */ - ctx->aws_provider->provider_vtable->async(ctx->aws_provider); - ctx->aws_provider->provider_vtable->upstream_set(ctx->aws_provider, ctx->ins); + /* + * Iterate over upstreams nodes and create elasticsearch_config context + * for each node + */ + mk_list_foreach(head, &ctx->ha->nodes) { + node = mk_list_entry(head, struct flb_upstream_node, _head); + /* Create elasticsearch_config context for the upstream node */ + node_ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); + if (!node_ec) { + flb_errno(); + flb_plg_error(ctx->ins, "failed upstream node config allocation for %s node", + node->name); + ret = -1; + break; + } + + /* + * Fill elasticsearch_config context of the upstream node from: + * 1. main elasticsearch_config context + * 2. upstream node configuration section + */ + ret = config_set_node_properties(node, node_ec, ec, ctx, config); + if (ret != 0) { + flb_plg_error(ctx->ins, "failed upstream node configuration for %s node", + node->name); + elasticsearch_config_destroy(node_ec); + break; } + + /* Register allocated elasticsearch_config context for later cleanup */ + mk_list_add(&node_ec->_head, &ctx->configs); + + /* Set elasticsearch_config context into the node opaque data */ + flb_upstream_node_set_data(node_ec, node); } -#endif - return ctx; + if (ret != 0) { + /* Nullify each upstream node elasticsearch_config context */ + mk_list_foreach(head, &ctx->ha->nodes) { + node = mk_list_entry(head, struct flb_upstream_node, _head); + flb_upstream_node_set_data(NULL, node); + } + + /* Cleanup elasticsearch_config contexts which were created */ + mk_list_foreach_safe(head, tmp, &ctx->configs) { + node_ec = mk_list_entry(head, struct flb_elasticsearch_config, _head); + mk_list_del(&node_ec->_head); + elasticsearch_config_destroy(node_ec); + } + + flb_upstream_ha_destroy(ctx->ha); + elasticsearch_config_destroy(ec); + return -1; + } + + /* Register allocated elasticsearch_config context for later cleanup */ + mk_list_add(&ec->_head, &ctx->configs); + + return 0; } -int flb_es_conf_destroy(struct flb_elasticsearch *ctx) +int es_config_simple(struct flb_elasticsearch *ctx, struct flb_config *config) { - if (!ctx) { - return 0; + int ret; + struct flb_elasticsearch_config *ec; + int io_flags = 0; + + /* Set default network configuration */ + flb_output_net_default(FLB_ES_DEFAULT_HOST, FLB_ES_DEFAULT_PORT, ctx->ins); + + /* Create elasticsearch_config context */ + ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); + if (!ec) { + flb_errno(); + flb_plg_error(ctx->ins, "failed config allocation"); + return -1; } - if (ctx->u) { - flb_upstream_destroy(ctx->u); + /* Read properties into elasticsearch_config context */ + ret = config_set_properties(ec, ctx, config); + if (ret != 0) { + elasticsearch_config_destroy(ec); + return -1; } - if (ctx->ra_id_key) { - flb_ra_destroy(ctx->ra_id_key); - ctx->ra_id_key = NULL; + +#ifdef FLB_HAVE_TLS + /* use TLS ? */ + if (ctx->ins->use_tls == FLB_TRUE) { + io_flags = FLB_IO_TLS; } - if (ctx->es_action) { - flb_free(ctx->es_action); + else { + io_flags = FLB_IO_TCP; } +#else + io_flags = FLB_IO_TCP; +#endif -#ifdef FLB_HAVE_AWS - if (ctx->base_aws_provider) { - flb_aws_provider_destroy(ctx->base_aws_provider); + if (ctx->ins->host.ipv6 == FLB_TRUE) { + io_flags |= FLB_IO_IPV6; + } + + /* Create upstream */ + ctx->ha_mode = FLB_FALSE; + ctx->u = flb_upstream_create(config, + ctx->ins->host.name, + ctx->ins->host.port, + io_flags, + ctx->ins->tls); + if (!ctx->u) { + flb_plg_error(ctx->ins, "cannot create Upstream context"); + elasticsearch_config_destroy(ec); + return -1; + } + + ret = flb_output_upstream_set(ctx->u, ctx->ins); + if (ret != 0) { + flb_upstream_destroy(ctx->u); + elasticsearch_config_destroy(ec); + return -1; } - if (ctx->aws_provider) { - flb_aws_provider_destroy(ctx->aws_provider); + mk_list_add(&ec->_head, &ctx->configs); + + return 0; +} + +struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, + struct flb_config *config) +{ + int ret; + const char *upstream_file; + struct flb_elasticsearch *ctx; + + /* Allocate context */ + ctx = flb_calloc(1, sizeof(struct flb_elasticsearch)); + if (!ctx) { + flb_errno(); + return NULL; } - if (ctx->aws_tls) { - flb_tls_destroy(ctx->aws_tls); + ctx->ins = ins; + mk_list_init(&ctx->configs); + flb_output_set_context(ins, ctx); + + /* Configure HA or simple mode ? */ + upstream_file = flb_output_get_property(FLB_ES_CONFIG_PROPERTY_UPSTREAM, ins); + if (upstream_file) { + ret = es_config_ha(upstream_file, ctx, config); + } + else { + ret = es_config_simple(ctx, config); + } + + if (ret != 0) { + flb_free(ctx); + return NULL; } - if (ctx->aws_sts_tls) { - flb_tls_destroy(ctx->aws_sts_tls); + return ctx; +} + +void flb_es_conf_destroy(struct flb_elasticsearch *ctx) +{ + struct flb_elasticsearch_config *ec; + struct mk_list *head; + struct mk_list *tmp; + + if (!ctx) { + return; } - if (ctx->aws_unsigned_headers) { - flb_slist_destroy(ctx->aws_unsigned_headers); - flb_free(ctx->aws_unsigned_headers); + /* Destroy upstreams */ + if (ctx->ha_mode == FLB_TRUE) { + if (ctx->ha) { + flb_upstream_ha_destroy(ctx->ha); + } } -#endif + else { + if (ctx->u) { + flb_upstream_destroy(ctx->u); + } + } + + /* Destroy elasticsearch_config contexts */ + mk_list_foreach_safe(head, tmp, &ctx->configs) { + ec = mk_list_entry(head, struct flb_elasticsearch_config, _head); - if (ctx->ra_prefix_key) { - flb_ra_destroy(ctx->ra_prefix_key); + mk_list_del(&ec->_head); + elasticsearch_config_destroy(ec); } - flb_free(ctx->cloud_passwd); - flb_free(ctx->cloud_user); flb_free(ctx); +} - return 0; +struct flb_elasticsearch_config *flb_es_upstream_conf(struct flb_elasticsearch *ctx, + struct flb_upstream_node *node) +{ + if (!ctx) { + return NULL; + } + if (node) { + /* Get elasticsearch_config stored in node opaque data */ + return flb_upstream_node_get_data(node); + } + if (mk_list_is_empty(&ctx->configs) == 0) { + return NULL; + } + return mk_list_entry_last(&ctx->configs, struct flb_elasticsearch_config, _head); } diff --git a/plugins/out_es/es_conf.h b/plugins/out_es/es_conf.h index ad46e17905b..39d04defb2b 100644 --- a/plugins/out_es/es_conf.h +++ b/plugins/out_es/es_conf.h @@ -20,14 +20,21 @@ #ifndef FLB_OUT_ES_CONF_H #define FLB_OUT_ES_CONF_H -#include -#include -#include +#include #include "es.h" +#define FLB_ES_WRITE_OP_INDEX "index" +#define FLB_ES_WRITE_OP_CREATE "create" +#define FLB_ES_WRITE_OP_UPDATE "update" +#define FLB_ES_WRITE_OP_UPSERT "upsert" + struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, struct flb_config *config); -int flb_es_conf_destroy(struct flb_elasticsearch *ctx); + +void flb_es_conf_destroy(struct flb_elasticsearch *ctx); + +struct flb_elasticsearch_config *flb_es_upstream_conf(struct flb_elasticsearch *ctx, + struct flb_upstream_node *node); #endif diff --git a/plugins/out_es/es_conf_parse.c b/plugins/out_es/es_conf_parse.c new file mode 100644 index 00000000000..9e81a9691b7 --- /dev/null +++ b/plugins/out_es/es_conf_parse.c @@ -0,0 +1,338 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "es_conf_parse.h" + +int flb_es_conf_set_cloud_credentials(const char *cloud_auth, + struct flb_elasticsearch_config *ec) +{ + /* extract strings */ + int items = 0; + struct mk_list *toks; + struct mk_list *head; + struct flb_split_entry *entry; + + if (!cloud_auth) { + return 0; + } + + toks = flb_utils_split((const char *)cloud_auth, ':', -1); + mk_list_foreach(head, toks) { + items++; + entry = mk_list_entry(head, struct flb_split_entry, _head); + if (items == 1) { + ec->cloud_user = flb_strdup(entry->value); + ec->own_cloud_user = FLB_TRUE; + } + if (items == 2) { + ec->cloud_passwd = flb_strdup(entry->value); + ec->own_cloud_passwd = FLB_TRUE; + } + } + flb_utils_split_free(toks); + + return 0; +} + +/* + * extract_cloud_host extracts the public hostname + * of a deployment from a Cloud ID string. + * + * The Cloud ID string has the format ":". + * Once decoded, the "base64_info" string has the format "$$" + * and the function returns "." token. + */ +static flb_sds_t extract_cloud_host(const char *cloud_id, struct flb_elasticsearch *ctx) +{ + + char *colon; + char *region; + char *host; + char *port = NULL; + char buf[256] = {0}; + char cloud_host_buf[256] = {0}; + const char dollar[2] = "$"; + size_t len; + int ret; + + /* keep only part after first ":" */ + colon = strchr(cloud_id, ':'); + if (colon == NULL) { + return NULL; + } + colon++; + + /* decode base64 */ + ret = flb_base64_decode((unsigned char *)buf, sizeof(buf), &len, + (unsigned char *)colon, strlen(colon)); + if (ret) { + flb_plg_error(ctx->ins, "cannot decode cloud_id"); + return NULL; + } + region = strtok(buf, dollar); + if (region == NULL) { + return NULL; + } + host = strtok(NULL, dollar); + if (host == NULL) { + return NULL; + } + + /* + * Some cloud id format is "$:$" . + * e.g. https://github.com/elastic/beats/blob/v8.4.1/libbeat/cloudid/cloudid_test.go#L60 + * + * It means the variable "host" can contains ':' and port number. + */ + colon = strchr(host, ':'); + if (colon != NULL) { + /* host contains host number */ + *colon = '\0'; /* remove port number from host */ + port = colon+1; + } + + strcpy(cloud_host_buf, host); + strcat(cloud_host_buf, "."); + strcat(cloud_host_buf, region); + if (port != NULL) { + strcat(cloud_host_buf, ":"); + strcat(cloud_host_buf, port); + } + return flb_sds_create(cloud_host_buf); +} + +int flb_es_conf_set_cloud_auth(const char *cloud_auth, struct flb_elasticsearch *ctx) +{ + char *cloud_host; + int cloud_host_port = 0; + char *cloud_port_char; + int cloud_port = FLB_ES_DEFAULT_HTTPS_PORT; + + if (!cloud_auth) { + return 0; + } + + cloud_host = extract_cloud_host(cloud_auth, ctx); + if (cloud_host == NULL) { + flb_plg_error(ctx->ins, "cannot extract cloud_host"); + return -1; + } + flb_plg_debug(ctx->ins, "extracted cloud_host: '%s'", cloud_host); + + cloud_port_char = strchr(cloud_host, ':'); + + if (cloud_port_char == NULL) { + flb_plg_debug(ctx->ins, "cloud_host: '%s' does not contain a port: '%s'", + cloud_host, cloud_host); + } + else { + cloud_port_char[0] = '\0'; + cloud_port_char = &cloud_port_char[1]; + flb_plg_debug(ctx->ins, "extracted cloud_port_char: '%s'", cloud_port_char); + cloud_host_port = (int)strtol(cloud_port_char, (char **)NULL, 10); + flb_plg_debug(ctx->ins, "converted cloud_port_char to port int: '%i'", + cloud_host_port); + } + + if (cloud_host_port == 0) { + cloud_host_port = cloud_port; + } + + flb_plg_debug(ctx->ins, + "checked whether extracted port was null and set it to " + "default https port or not. Outcome: '%i' and cloud_host: '%s'.", + cloud_host_port, cloud_host); + + if (ctx->ins->host.name != NULL) { + flb_sds_destroy(ctx->ins->host.name); + } + + ctx->ins->host.name = cloud_host; + ctx->ins->host.port = cloud_host_port; + + return 0; +} + +#ifdef FLB_HAVE_AWS + +int flb_es_set_aws_unsigned_headers(struct flb_elasticsearch_config *ec) +{ + int ret; + + /* AWS Auth Unsigned Headers */ + ec->aws_unsigned_headers = flb_malloc(sizeof(struct mk_list)); + if (!ec->aws_unsigned_headers) { + flb_errno(); + return -1; + } + ec->own_aws_unsigned_headers = FLB_TRUE; + + flb_slist_create(ec->aws_unsigned_headers); + ret = flb_slist_add(ec->aws_unsigned_headers, "Content-Length"); + if (ret != 0) { + return -1; + } + + return 0; +} + +static int set_aws_sts_provider(const char *aws_external_id, + const char *aws_role_arn, + struct flb_elasticsearch_config *ec, + struct flb_elasticsearch *ctx, + struct flb_config *config) +{ + char *aws_session_name = NULL; + + if (!aws_role_arn) { + return 0; + } + + /* Use the STS Provider */ + ec->base_aws_provider = ec->aws_provider; + + aws_session_name = flb_sts_session_name(); + if (!aws_session_name) { + flb_error("[out_es] Failed to create aws iam role session name"); + return -1; + } + + /* STS provider needs yet another separate TLS instance */ + ec->aws_sts_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + ctx->ins->tls_debug, + ctx->ins->tls_vhost, + ctx->ins->tls_ca_path, + ctx->ins->tls_ca_file, + ctx->ins->tls_crt_file, + ctx->ins->tls_key_file, + ctx->ins->tls_key_passwd); + if (!ec->aws_sts_tls) { + flb_errno(); + flb_free(aws_session_name); + return -1; + } + ec->own_aws_sts_tls = FLB_TRUE; + + ec->aws_provider = flb_sts_provider_create(config, + ec->aws_sts_tls, + ec->base_aws_provider, + (char *)aws_external_id, + (char *)aws_role_arn, + aws_session_name, + ec->aws_region, + ec->aws_sts_endpoint, + NULL, + flb_aws_client_generator()); + ec->own_base_aws_provider = FLB_TRUE; + ec->own_aws_provider = FLB_TRUE; + + /* Session name can be freed once provider is created */ + flb_free(aws_session_name); + + if (!ec->aws_provider) { + flb_error("[out_es] Failed to create AWS STS Credential Provider"); + return -1; + } + + return 0; +} + +int flb_es_conf_set_aws_provider(const char *aws_external_id, + const char *aws_role_arn, + struct flb_elasticsearch_config *ec, + struct flb_elasticsearch *ctx, + struct flb_config *config) +{ + int ret; + + if (ec->has_aws_auth == FLB_FALSE) { + ec->aws_tls = NULL; + ec->aws_provider = NULL; + ec->base_aws_provider = NULL; + return 0; + } + + flb_debug("[out_es] Enabled AWS Auth"); + + if (!ec->aws_region) { + flb_error("[out_es] aws_auth enabled but aws_region not set"); + return -1; + } + + /* AWS provider needs a separate TLS instance */ + ec->aws_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + ctx->ins->tls_debug, + ctx->ins->tls_vhost, + ctx->ins->tls_ca_path, + ctx->ins->tls_ca_file, + ctx->ins->tls_crt_file, + ctx->ins->tls_key_file, + ctx->ins->tls_key_passwd); + if (!ec->aws_tls) { + flb_errno(); + return -1; + } + ec->own_aws_tls = FLB_TRUE; + + ec->aws_provider = flb_standard_chain_provider_create(config, + ec->aws_tls, + ec->aws_region, + ec->aws_sts_endpoint, + NULL, + flb_aws_client_generator(), + ec->aws_profile); + if (!ec->aws_provider) { + flb_error("[out_es] Failed to create AWS Credential Provider"); + return -1; + } + ec->own_aws_provider = FLB_TRUE; + + ret = set_aws_sts_provider(aws_external_id, aws_role_arn, ec, ctx, config); + if (ret != 0) { + flb_error("[out_es] Failed to configure AWS role"); + return -1; + } + + /* initialize credentials in sync mode */ + ec->aws_provider->provider_vtable->sync(ec->aws_provider); + ec->aws_provider->provider_vtable->init(ec->aws_provider); + /* set back to async */ + ec->aws_provider->provider_vtable->async(ec->aws_provider); + ec->aws_provider->provider_vtable->upstream_set(ec->aws_provider, ctx->ins); + + return 0; +} + +#endif diff --git a/plugins/out_es/es_conf_parse.h b/plugins/out_es/es_conf_parse.h new file mode 100644 index 00000000000..1eaf8a6d068 --- /dev/null +++ b/plugins/out_es/es_conf_parse.h @@ -0,0 +1,53 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_ES_CONF_PARSE_H +#define FLB_OUT_ES_CONF_PARSE_H + +#include + +#include "es.h" + +/* + * flb_es_conf_set_cloud_credentials gets a cloud_auth + * and sets the context's cloud_user and cloud_passwd. + * Example: + * cloud_auth = elastic:ZXVyb3BxxxxxxZTA1Ng + * ----> + * cloud_user = elastic + * cloud_passwd = ZXVyb3BxxxxxxZTA1Ng + */ +int flb_es_conf_set_cloud_credentials(const char *cloud_auth, + struct flb_elasticsearch_config *ec); + +int flb_es_conf_set_cloud_auth(const char *cloud_auth, struct flb_elasticsearch *ctx); + +#ifdef FLB_HAVE_AWS + +int flb_es_set_aws_unsigned_headers(struct flb_elasticsearch_config *ec); + +int flb_es_conf_set_aws_provider(const char *aws_external_id, + const char *aws_role_arn, + struct flb_elasticsearch_config *ec, + struct flb_elasticsearch *ctx, + struct flb_config *config); + +#endif + +#endif diff --git a/plugins/out_es/es_conf_prop.h b/plugins/out_es/es_conf_prop.h new file mode 100644 index 00000000000..b555f4d074b --- /dev/null +++ b/plugins/out_es/es_conf_prop.h @@ -0,0 +1,64 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_ES_CONF_PROP_H +#define FLB_OUT_ES_CONF_PROP_H + +#define FLB_ES_CONFIG_PROPERTY_INDEX "index" +#define FLB_ES_CONFIG_PROPERTY_TYPE "type" +#define FLB_ES_CONFIG_PROPERTY_SUPPRESS_TYPE_NAME "suppress_type_name" +#define FLB_ES_CONFIG_PROPERTY_HTTP_USER "http_user" +#define FLB_ES_CONFIG_PROPERTY_HTTP_PASSWD "http_passwd" +#define FLB_ES_CONFIG_PROPERTY_COMPRESS "compress" +#define FLB_ES_CONFIG_PROPERTY_CLOUD_ID "cloud_id" +#define FLB_ES_CONFIG_PROPERTY_CLOUD_AUTH "cloud_auth" + +#ifdef FLB_HAVE_AWS +#define FLB_ES_CONFIG_PROPERTY_AWS_AUTH "aws_auth" +#define FLB_ES_CONFIG_PROPERTY_AWS_REGION "aws_region" +#define FLB_ES_CONFIG_PROPERTY_AWS_STS_ENDPOINT "aws_sts_endpoint" +#define FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN "aws_role_arn" +#define FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID "aws_external_id" +#define FLB_ES_CONFIG_PROPERTY_AWS_SERVICE_NAME "aws_service_name" +#define FLB_ES_CONFIG_PROPERTY_AWS_PROFILE "aws_profile" +#endif + +#define FLB_ES_CONFIG_PROPERTY_LOGSTASH_FORMAT "logstash_format" +#define FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX "logstash_prefix" +#define FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_SEPARATOR "logstash_prefix_separator" +#define FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_KEY "logstash_prefix_key" +#define FLB_ES_CONFIG_PROPERTY_LOGSTASH_DATEFORMAT "logstash_dateformat" +#define FLB_ES_CONFIG_PROPERTY_TIME_KEY "time_key" +#define FLB_ES_CONFIG_PROPERTY_TIME_KEY_FORMAT "time_key_format" +#define FLB_ES_CONFIG_PROPERTY_TIME_KEY_NANOS "time_key_nanos" +#define FLB_ES_CONFIG_PROPERTY_INCLUDE_TAG_KEY "include_tag_key" +#define FLB_ES_CONFIG_PROPERTY_TAG_KEY "tag_key" +#define FLB_ES_CONFIG_PROPERTY_BUFFER_SIZE "buffer_size" +#define FLB_ES_CONFIG_PROPERTY_PATH "path" +#define FLB_ES_CONFIG_PROPERTY_PIPELINE "pipeline" +#define FLB_ES_CONFIG_PROPERTY_GENERATE_ID "generate_id" +#define FLB_ES_CONFIG_PROPERTY_WRITE_OPERATION "write_operation" +#define FLB_ES_CONFIG_PROPERTY_ID_KEY "id_key" +#define FLB_ES_CONFIG_PROPERTY_REPLACE_DOTS "replace_dots" +#define FLB_ES_CONFIG_PROPERTY_CURRENT_TIME_INDEX "current_time_index" +#define FLB_ES_CONFIG_PROPERTY_TRACE_OUTPUT "trace_output" +#define FLB_ES_CONFIG_PROPERTY_TRACE_ERROR "trace_error" +#define FLB_ES_CONFIG_PROPERTY_UPSTREAM "upstream" + +#endif diff --git a/src/flb_engine_dispatch.c b/src/flb_engine_dispatch.c index b161888e679..903984a5ad3 100644 --- a/src/flb_engine_dispatch.c +++ b/src/flb_engine_dispatch.c @@ -101,18 +101,23 @@ int flb_engine_dispatch_retry(struct flb_task_retry *retry, static void test_run_formatter(struct flb_config *config, struct flb_input_instance *i_ins, struct flb_output_instance *o_ins, - struct flb_task *task, - void *flush_ctx) + struct flb_task *task) { int ret; void *out_buf = NULL; size_t out_size = 0; struct flb_test_out_formatter *otf; struct flb_event_chunk *evc; + void *flush_ctx; otf = &o_ins->test_formatter; evc = task->event_chunk; + flush_ctx = otf->flush_ctx; + if (otf->flush_ctx_callback) { + flush_ctx = otf->flush_ctx_callback(config, i_ins, o_ins->context, flush_ctx); + } + /* Invoke the output plugin formatter test callback */ ret = otf->callback(config, i_ins, @@ -176,9 +181,7 @@ static int tasks_start(struct flb_input_instance *in, out->test_formatter.callback != NULL) { /* Run the formatter test */ - test_run_formatter(config, in, out, - task, - out->test_formatter.flush_ctx); + test_run_formatter(config, in, out, task); /* Remove the route */ mk_list_del(&route->_head); diff --git a/src/flb_lib.c b/src/flb_lib.c index 0e4cde0dbaf..f077c1617e6 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -579,7 +579,7 @@ int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name, int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name, void (*out_callback) (void *, int, int, void *, size_t, void *), void *out_callback_data, - void *test_ctx) + void *flush_ctx) { struct flb_output_instance *o_ins; @@ -600,7 +600,41 @@ int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name, o_ins->test_formatter.rt_ffd = ffd; o_ins->test_formatter.rt_out_callback = out_callback; o_ins->test_formatter.rt_data = out_callback_data; - o_ins->test_formatter.flush_ctx = test_ctx; + o_ins->test_formatter.flush_ctx = flush_ctx; + } + else { + return -1; + } + + return 0; +} + +int flb_output_set_test_flush_ctx_callback(flb_ctx_t *ctx, int ffd, + char *test_name, + void *(*flush_ctx_callback) (struct flb_config *, + struct flb_input_instance *, + void *, void *), + void *flush_ctx) +{ + struct flb_output_instance *o_ins; + + o_ins = out_instance_get(ctx, ffd); + if (!o_ins) { + return -1; + } + + /* + * Enabling a test, set the output instance in 'test' mode, so no real + * flush callback is invoked, only the desired implemented test. + */ + + /* Formatter test */ + if (strcmp(test_name, "formatter") == 0) { + o_ins->test_mode = FLB_TRUE; + o_ins->test_formatter.rt_ctx = ctx; + o_ins->test_formatter.rt_ffd = ffd; + o_ins->test_formatter.flush_ctx = flush_ctx; + o_ins->test_formatter.flush_ctx_callback = flush_ctx_callback; } else { return -1; diff --git a/tests/runtime/out_elasticsearch.c b/tests/runtime/out_elasticsearch.c index eac72cbf321..351eaee1df2 100644 --- a/tests/runtime/out_elasticsearch.c +++ b/tests/runtime/out_elasticsearch.c @@ -1,11 +1,117 @@ /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +#include +#include +#include + +#include #include #include "flb_tests_runtime.h" /* Test data */ #include "data/es/json_es.h" /* JSON_ES */ +/* + * Include plugin headers to get the definition of structure used as flush context + * and to know how to extract that structure from plugin context. + */ +#include "../../plugins/out_es/es.h" + +static const char * const es_upstream_section_property_prefix = " "; +static const char * const es_upstream_section_value_prefix = " "; + +static const char *create_upstream_conf_file(const char *first_property, ...) +{ + char *upstream_conf_filename; + FILE *upstream_conf_file; + int ret; + const char *arg; + int arg_idx; + va_list args; + + upstream_conf_filename = tmpnam(NULL); + if (upstream_conf_filename == NULL) { + return NULL; + } + + upstream_conf_file = fopen(upstream_conf_filename, "w"); + if (upstream_conf_file == NULL) { + return NULL; + } + + ret = fprintf(upstream_conf_file, "%s\n%s%s%s%s\n%s\n%s%s%s%s\n%s%s%s%s\n%s%s%s%s\n", + "[UPSTREAM]", + es_upstream_section_property_prefix, + "name", es_upstream_section_value_prefix, "es-balancing", + "[NODE]", + es_upstream_section_property_prefix, + "name", es_upstream_section_value_prefix, "node1", + es_upstream_section_property_prefix, + "host", es_upstream_section_value_prefix, FLB_ES_DEFAULT_HOST, + es_upstream_section_property_prefix, + "port", es_upstream_section_value_prefix, "9200"); + if (ret < 0) { + fclose(upstream_conf_file); + remove(upstream_conf_filename); + return NULL; + } + + arg = first_property; + arg_idx = 0; + va_start(args, first_property); + while (arg != NULL) { + if (arg_idx == 0) { + ret = fprintf(upstream_conf_file, "%s%s", + es_upstream_section_property_prefix, arg); + } + else { + if (strlen(arg) > 0) { + ret = fprintf(upstream_conf_file, "%s%s\n", + es_upstream_section_value_prefix, arg); + } + else { + ret = fprintf(upstream_conf_file, "\n"); + } + } + if (ret < 0) { + va_end(args); + fclose(upstream_conf_file); + remove(upstream_conf_filename); + return NULL; + } + arg = va_arg(args, const char *); + arg_idx ^= 1; + } + va_end(args); + + if (arg_idx != 0) { + ret = fprintf(upstream_conf_file, "\n"); + if (ret < 0) { + fclose(upstream_conf_file); + remove(upstream_conf_filename); + return NULL; + } + } + + ret = fclose(upstream_conf_file); + if (ret != 0) { + remove(upstream_conf_filename); + return NULL; + } + + return upstream_conf_filename; +} + +static void *cb_flush_context(struct flb_config *config, struct flb_input_instance *ins, + void *plugin_context, void *flush_ctx) +{ + struct flb_upstream_node *node; + struct flb_elasticsearch *ctx = plugin_context; + (void) config; + (void) ins; + (void) flush_ctx; + return flb_elasticsearch_target(ctx, &node); +} static void cb_check_write_op_index(void *ctx, int ffd, int res_ret, void *res_data, @@ -196,8 +302,11 @@ void flb_test_write_operation_index() /* Enable test mode */ ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_write_op_index, - NULL, NULL); + cb_check_write_op_index, NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -242,6 +351,10 @@ void flb_test_write_operation_create() ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_write_op_create, NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -288,6 +401,10 @@ void flb_test_write_operation_update() ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_write_op_update, NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -334,6 +451,10 @@ void flb_test_write_operation_upsert() ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_write_op_upsert, NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -376,9 +497,12 @@ void flb_test_index_type() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_index_type, + ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_index_type, NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -425,6 +549,10 @@ void flb_test_logstash_format() ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_logstash_format, NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -472,6 +600,10 @@ void flb_test_logstash_format_nanos() ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_logstash_format_nanos, NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -514,9 +646,12 @@ void flb_test_tag_key() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_tag_key, + ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_tag_key, NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -558,9 +693,12 @@ void flb_test_replace_dots() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_replace_dots, + ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_replace_dots, NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -602,9 +740,12 @@ void flb_test_id_key() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_id_key, + ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_id_key, NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -656,9 +797,12 @@ void flb_test_div0() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_nothing, + ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_nothing, NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -736,9 +880,12 @@ void flb_test_long_index() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_long_index, + ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_long_index, NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -786,6 +933,69 @@ void flb_test_logstash_prefix_separator() ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_logstash_prefix_separator, NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_upstream_write_operation() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *upstream_conf_filename; + + /* Override write_operation to index at upstream node level */ + upstream_conf_filename = create_upstream_conf_file("write_operation", "index", NULL); + TEST_ASSERT(upstream_conf_filename != NULL); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "Write_Operation", "Upsert", + "Generate_Id", "True", + NULL); + + /* Use upstream servers */ + flb_output_set(ctx, out_ffd, + "Upstream", upstream_conf_filename, + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_write_op_index, + NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -797,6 +1007,251 @@ void flb_test_logstash_prefix_separator() sleep(2); flb_stop(ctx); flb_destroy(ctx); + + /* Cleanup temporary configuration file */ + TEST_ASSERT(remove(upstream_conf_filename) == 0); +} + +void flb_test_upstream_index_type() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *upstream_conf_filename; + + /* Override default index and type at upstream node level */ + upstream_conf_filename = create_upstream_conf_file("index", "index_test", + "type", "type_test", + NULL); + TEST_ASSERT(upstream_conf_filename != NULL); + + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Use upstream servers */ + flb_output_set(ctx, out_ffd, + "Upstream", upstream_conf_filename, + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_index_type, + NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); + + /* Cleanup temporary configuration file */ + TEST_ASSERT(remove(upstream_conf_filename) == 0); +} + +void flb_test_upstream_logstash_format() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *upstream_conf_filename; + + /* Specify Logstash format at upstream node level */ + upstream_conf_filename = create_upstream_conf_file("logstash_format", "on", + "logstash_prefix", "prefix", + "logstash_prefix_separator", "SEP", + "logstash_dateformat", "%Y-%m-%d", + NULL); + TEST_ASSERT(upstream_conf_filename != NULL); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Use configuration different from upstream node configuration */ + flb_output_set(ctx, out_ffd, + "logstash_format", "off", + "logstash_prefix", "logstash", + "logstash_prefix_separator", "-" + "logstash_dateformat", "%Y.%m.%d", + NULL); + + /* Use upstream servers */ + flb_output_set(ctx, out_ffd, + "Upstream", upstream_conf_filename, + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_logstash_prefix_separator, + NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); + + /* Cleanup temporary configuration file */ + TEST_ASSERT(remove(upstream_conf_filename) == 0); +} + +void flb_test_upstream_replace_dots() +{ + int ret; + int size = sizeof(JSON_DOTS) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *upstream_conf_filename; + + /* Specify Logstash format at upstream node level */ + upstream_conf_filename = create_upstream_conf_file("replace_dots", "on", + NULL); + TEST_ASSERT(upstream_conf_filename != NULL); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Use configuration different from upstream node configuration */ + flb_output_set(ctx, out_ffd, + "replace_dots", "off", + NULL); + + /* Use upstream servers */ + flb_output_set(ctx, out_ffd, + "Upstream", upstream_conf_filename, + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_replace_dots, + NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_DOTS, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); + + /* Cleanup temporary configuration file */ + TEST_ASSERT(remove(upstream_conf_filename) == 0); +} + +void flb_test_upstream_id_key() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + const char *upstream_conf_filename; + + /* Specify Logstash format at upstream node level */ + upstream_conf_filename = create_upstream_conf_file("id_key", "key_2", NULL); + TEST_ASSERT(upstream_conf_filename != NULL); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Use upstream servers */ + flb_output_set(ctx, out_ffd, + "Upstream", upstream_conf_filename, + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", cb_check_id_key, + NULL, NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set_test_flush_ctx_callback(ctx, out_ffd, "formatter", + cb_flush_context, NULL); + TEST_CHECK(ret == 0); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); + + /* Cleanup temporary configuration file */ + TEST_ASSERT(remove(upstream_conf_filename) == 0); } static void cb_check_response_success(void *ctx, int ffd, @@ -975,5 +1430,10 @@ TEST_LIST = { {"response_success" , flb_test_response_success }, {"response_successes", flb_test_response_successes }, {"response_partially_success" , flb_test_response_partially_success }, + {"upstream_write_operation" , flb_test_upstream_write_operation }, + {"upstream_index_type" , flb_test_upstream_index_type }, + {"upstream_logstash_format" , flb_test_upstream_logstash_format }, + {"upstream_replace_dots" , flb_test_upstream_replace_dots }, + {"upstream_id_key" , flb_test_upstream_id_key }, {NULL, NULL} };