diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 80eb94aa010..5ac72b0c306 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -31,6 +31,7 @@ #include <fluent-bit/flb_record_accessor.h> #include <fluent-bit/flb_ra_key.h> #include <fluent-bit/flb_log_event_decoder.h> +#include <fluent-bit/flb_upstream_node.h> #include "es.h" #include "es_conf.h" @@ -647,7 +648,7 @@ static struct flb_elasticsearch_config *flb_elasticsearch_target( struct flb_upstream_node *target_node; if (ctx->ha_mode == FLB_FALSE) { - ec = mk_list_entry_first(&ctx->configs, struct flb_elasticsearch_config, _head); + ec = flb_es_upstream_conf(ctx, NULL); *node = NULL; return ec; } @@ -658,8 +659,7 @@ static struct flb_elasticsearch_config *flb_elasticsearch_target( return NULL; } - /* Get elasticsearch_config stored in node opaque data */ - ec = flb_upstream_node_get_data(target_node); + ec = flb_es_upstream_conf(ctx, target_node); *node = target_node; return ec; diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index c5f3bee54f8..4ff27a5cfbd 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -428,3 +428,19 @@ void flb_es_conf_destroy(struct flb_elasticsearch *ctx) flb_free(ctx); } + +struct flb_elasticsearch_config *flb_es_upstream_conf(struct flb_elasticsearch *ctx, + struct flb_upstream_node *node) +{ + if (!ctx) { + return NULL; + } + if (node) { + /* Get elasticsearch_config stored in node opaque data */ + return flb_upstream_node_get_data(node); + } + if (mk_list_is_empty(&ctx->configs) == 0) { + return NULL; + } + return mk_list_entry_last(&ctx->configs, struct flb_elasticsearch_config, _head); +} diff --git a/plugins/out_es/es_conf.h b/plugins/out_es/es_conf.h index 1161c87a226..7cfcf595730 100644 --- a/plugins/out_es/es_conf.h +++ b/plugins/out_es/es_conf.h @@ -20,6 +20,8 @@ #ifndef FLB_OUT_ES_CONF_H #define FLB_OUT_ES_CONF_H +#include <fluent-bit/flb_upstream_node.h> + #include "es.h" struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, @@ -27,4 +29,7 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, void flb_es_conf_destroy(struct flb_elasticsearch *ctx); +struct flb_elasticsearch_config *flb_es_upstream_conf(struct flb_elasticsearch *ctx, + struct flb_upstream_node *node); + #endif diff --git a/tests/runtime/out_elasticsearch.c b/tests/runtime/out_elasticsearch.c index eac72cbf321..914e363ae8e 100644 --- a/tests/runtime/out_elasticsearch.c +++ b/tests/runtime/out_elasticsearch.c @@ -1,11 +1,28 @@ /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +#include <monkey/mk_core/mk_list.h> #include <fluent-bit.h> #include "flb_tests_runtime.h" /* Test data */ #include "data/es/json_es.h" /* JSON_ES */ +/* + * Include plugin headers to get the definition of structure used as flush context + * and to know how to extract that structure from plugin context. + */ +#include "../../plugins/out_es/es.h" +#include "../../plugins/out_es/es_conf.h" + +static void *cb_flush_context(struct flb_config *config, struct flb_input_instance *ins, + void *plugin_context, void *flush_ctx) +{ + struct flb_elasticsearch *ctx = plugin_context; + (void) config; + (void) ins; + (void) flush_ctx; + return flb_es_upstream_conf(ctx, NULL); +} static void cb_check_write_op_index(void *ctx, int ffd, int res_ret, void *res_data, @@ -195,9 +212,10 @@ void flb_test_write_operation_index() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_write_op_index, - NULL, NULL); + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_write_op_index, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -239,9 +257,10 @@ void flb_test_write_operation_create() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_write_op_create, - NULL, NULL); + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_write_op_create, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -285,9 +304,10 @@ void flb_test_write_operation_update() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_write_op_update, - NULL, NULL); + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_write_op_update, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -331,9 +351,10 @@ void flb_test_write_operation_upsert() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_write_op_upsert, - NULL, NULL); + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_write_op_upsert, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -376,9 +397,10 @@ void flb_test_index_type() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_index_type, - NULL, NULL); + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_index_type, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -422,9 +444,10 @@ void flb_test_logstash_format() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_logstash_format, - NULL, NULL); + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_logstash_format, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -469,9 +492,10 @@ void flb_test_logstash_format_nanos() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_logstash_format_nanos, - NULL, NULL); + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_logstash_format_nanos, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -514,9 +538,10 @@ void flb_test_tag_key() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_tag_key, - NULL, NULL); + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_tag_key, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -558,9 +583,10 @@ void flb_test_replace_dots() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_replace_dots, - NULL, NULL); + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_replace_dots, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -602,9 +628,10 @@ void flb_test_id_key() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_id_key, - NULL, NULL); + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_id_key, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -656,9 +683,10 @@ void flb_test_div0() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_nothing, - NULL, NULL); + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_nothing, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -736,9 +764,10 @@ void flb_test_long_index() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_long_index, - NULL, NULL); + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_long_index, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx); @@ -783,9 +812,10 @@ void flb_test_logstash_prefix_separator() NULL); /* Enable test mode */ - ret = flb_output_set_test(ctx, out_ffd, "formatter", - cb_check_logstash_prefix_separator, - NULL, NULL); + ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter", + cb_check_logstash_prefix_separator, + NULL, NULL, cb_flush_context); + TEST_CHECK(ret == 0); /* Start */ ret = flb_start(ctx);