Skip to content

Commit

Permalink
pipeline: outputs: es: fixed tests after modification of formatter ca…
Browse files Browse the repository at this point in the history
…llback contract

Signed-off-by: Marat Abrarov <[email protected]>
  • Loading branch information
mabrarov committed Jul 7, 2023
1 parent 828d7b5 commit 3434d61
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 42 deletions.
6 changes: 3 additions & 3 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <fluent-bit/flb_record_accessor.h>
#include <fluent-bit/flb_ra_key.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_upstream_node.h>

#include "es.h"
#include "es_conf.h"
Expand Down Expand Up @@ -647,7 +648,7 @@ static struct flb_elasticsearch_config *flb_elasticsearch_target(
struct flb_upstream_node *target_node;

if (ctx->ha_mode == FLB_FALSE) {
ec = mk_list_entry_first(&ctx->configs, struct flb_elasticsearch_config, _head);
ec = flb_es_upstream_conf(ctx, NULL);
*node = NULL;
return ec;
}
Expand All @@ -658,8 +659,7 @@ static struct flb_elasticsearch_config *flb_elasticsearch_target(
return NULL;
}

/* Get elasticsearch_config stored in node opaque data */
ec = flb_upstream_node_get_data(target_node);
ec = flb_es_upstream_conf(ctx, target_node);
*node = target_node;

return ec;
Expand Down
16 changes: 16 additions & 0 deletions plugins/out_es/es_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,19 @@ void flb_es_conf_destroy(struct flb_elasticsearch *ctx)

flb_free(ctx);
}

struct flb_elasticsearch_config *flb_es_upstream_conf(struct flb_elasticsearch *ctx,
struct flb_upstream_node *node)
{
if (!ctx) {
return NULL;
}
if (node) {
/* Get elasticsearch_config stored in node opaque data */
return flb_upstream_node_get_data(node);
}
if (mk_list_is_empty(&ctx->configs) == 0) {
return NULL;
}
return mk_list_entry_last(&ctx->configs, struct flb_elasticsearch_config, _head);
}
5 changes: 5 additions & 0 deletions plugins/out_es/es_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@
#ifndef FLB_OUT_ES_CONF_H
#define FLB_OUT_ES_CONF_H

#include <fluent-bit/flb_upstream_node.h>

#include "es.h"

struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
struct flb_config *config);

void flb_es_conf_destroy(struct flb_elasticsearch *ctx);

struct flb_elasticsearch_config *flb_es_upstream_conf(struct flb_elasticsearch *ctx,
struct flb_upstream_node *node);

#endif
108 changes: 69 additions & 39 deletions tests/runtime/out_elasticsearch.c
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

#include <monkey/mk_core/mk_list.h>
#include <fluent-bit.h>
#include "flb_tests_runtime.h"

/* Test data */
#include "data/es/json_es.h" /* JSON_ES */

/*
* Include plugin headers to get the definition of structure used as flush context
* and to know how to extract that structure from plugin context.
*/
#include "../../plugins/out_es/es.h"
#include "../../plugins/out_es/es_conf.h"

static void *cb_flush_context(struct flb_config *config, struct flb_input_instance *ins,
void *plugin_context, void *flush_ctx)
{
struct flb_elasticsearch *ctx = plugin_context;
(void) config;
(void) ins;
(void) flush_ctx;
return flb_es_upstream_conf(ctx, NULL);
}

static void cb_check_write_op_index(void *ctx, int ffd,
int res_ret, void *res_data,
Expand Down Expand Up @@ -195,9 +212,10 @@ void flb_test_write_operation_index()
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_write_op_index,
NULL, NULL);
ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter",
cb_check_write_op_index,
NULL, NULL, cb_flush_context);
TEST_CHECK(ret == 0);

/* Start */
ret = flb_start(ctx);
Expand Down Expand Up @@ -239,9 +257,10 @@ void flb_test_write_operation_create()
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_write_op_create,
NULL, NULL);
ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter",
cb_check_write_op_create,
NULL, NULL, cb_flush_context);
TEST_CHECK(ret == 0);

/* Start */
ret = flb_start(ctx);
Expand Down Expand Up @@ -285,9 +304,10 @@ void flb_test_write_operation_update()
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_write_op_update,
NULL, NULL);
ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter",
cb_check_write_op_update,
NULL, NULL, cb_flush_context);
TEST_CHECK(ret == 0);

/* Start */
ret = flb_start(ctx);
Expand Down Expand Up @@ -331,9 +351,10 @@ void flb_test_write_operation_upsert()
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_write_op_upsert,
NULL, NULL);
ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter",
cb_check_write_op_upsert,
NULL, NULL, cb_flush_context);
TEST_CHECK(ret == 0);

/* Start */
ret = flb_start(ctx);
Expand Down Expand Up @@ -376,9 +397,10 @@ void flb_test_index_type()
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_index_type,
NULL, NULL);
ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter",
cb_check_index_type,
NULL, NULL, cb_flush_context);
TEST_CHECK(ret == 0);

/* Start */
ret = flb_start(ctx);
Expand Down Expand Up @@ -422,9 +444,10 @@ void flb_test_logstash_format()
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_logstash_format,
NULL, NULL);
ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter",
cb_check_logstash_format,
NULL, NULL, cb_flush_context);
TEST_CHECK(ret == 0);

/* Start */
ret = flb_start(ctx);
Expand Down Expand Up @@ -469,9 +492,10 @@ void flb_test_logstash_format_nanos()
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_logstash_format_nanos,
NULL, NULL);
ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter",
cb_check_logstash_format_nanos,
NULL, NULL, cb_flush_context);
TEST_CHECK(ret == 0);

/* Start */
ret = flb_start(ctx);
Expand Down Expand Up @@ -514,9 +538,10 @@ void flb_test_tag_key()
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_tag_key,
NULL, NULL);
ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter",
cb_check_tag_key,
NULL, NULL, cb_flush_context);
TEST_CHECK(ret == 0);

/* Start */
ret = flb_start(ctx);
Expand Down Expand Up @@ -558,9 +583,10 @@ void flb_test_replace_dots()
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_replace_dots,
NULL, NULL);
ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter",
cb_check_replace_dots,
NULL, NULL, cb_flush_context);
TEST_CHECK(ret == 0);

/* Start */
ret = flb_start(ctx);
Expand Down Expand Up @@ -602,9 +628,10 @@ void flb_test_id_key()
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_id_key,
NULL, NULL);
ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter",
cb_check_id_key,
NULL, NULL, cb_flush_context);
TEST_CHECK(ret == 0);

/* Start */
ret = flb_start(ctx);
Expand Down Expand Up @@ -656,9 +683,10 @@ void flb_test_div0()
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_nothing,
NULL, NULL);
ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter",
cb_check_nothing,
NULL, NULL, cb_flush_context);
TEST_CHECK(ret == 0);

/* Start */
ret = flb_start(ctx);
Expand Down Expand Up @@ -736,9 +764,10 @@ void flb_test_long_index()
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_long_index,
NULL, NULL);
ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter",
cb_check_long_index,
NULL, NULL, cb_flush_context);
TEST_CHECK(ret == 0);

/* Start */
ret = flb_start(ctx);
Expand Down Expand Up @@ -783,9 +812,10 @@ void flb_test_logstash_prefix_separator()
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_logstash_prefix_separator,
NULL, NULL);
ret = flb_output_set_test_with_ctx_callback(ctx, out_ffd, "formatter",
cb_check_logstash_prefix_separator,
NULL, NULL, cb_flush_context);
TEST_CHECK(ret == 0);

/* Start */
ret = flb_start(ctx);
Expand Down

0 comments on commit 3434d61

Please sign in to comment.