Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_es: Add target_index variable using record accessor syntax. #7716

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ static int elasticsearch_format(struct flb_config *config,
size_t off = 0;
size_t off_prev = 0;
char *es_index;
char index_value[256];
char logstash_index[256];
char time_formatted[256];
char index_formatted[256];
Expand Down Expand Up @@ -350,13 +351,15 @@ static int elasticsearch_format(struct flb_config *config,
}

/*
* If logstash format and id generation are disabled, pre-generate
* the index line for all records.
* If logstash format, index record accessor and id generation are disabled,
* pre-generate the index line for all records.
*
* The header stored in 'j_index' will be used for the all records on
* this payload.
*/
if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) {
if (ctx->logstash_format == FLB_FALSE &&
ctx->generate_id == FLB_FALSE && ctx->ra_index == NULL) {

flb_time_get(&tms);
gmtime_r(&tms.tm.tv_sec, &tm);
strftime(index_formatted, sizeof(index_formatted) - 1,
Expand Down Expand Up @@ -453,6 +456,37 @@ static int elasticsearch_format(struct flb_config *config,
msgpack_pack_str_body(&tmp_pck, time_formatted, s);

es_index = ctx->index;
if (ctx->ra_index) {
flb_sds_t v = flb_ra_translate(ctx->ra_index,
(char *) tag, tag_len,
map, NULL);
if (v) {
len = flb_sds_len(v);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either dynamically allocating the index so it is not truncated or returning a warning or error might be better idea. The limit should also be checked against the actual size of index_value, (sizeof()-1), instead of a hardcoded value.

if (len > 128) {
len = 128;
}
memcpy(index_value, v, len);
index_value[len] = '\0';
es_index_custom_len = len;
flb_sds_destroy(v);
es_index = index_value;
}
if (ctx->generate_id == FLB_FALSE) {
if (ctx->suppress_type_name) {
index_len = flb_sds_snprintf (&j_index,
flb_sds_alloc (j_index),
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
ctx->es_action, es_index);
}
else {
index_len = flb_sds_snprintf (&j_index,
flb_sds_alloc (j_index),
ES_BULK_INDEX_FMT,
ctx->es_action,
es_index, ctx->type);
}
}
}
if (ctx->logstash_format == FLB_TRUE) {
ret = compose_index_header(ctx, es_index_custom_len,
&logstash_index[0], sizeof(logstash_index),
Expand Down
2 changes: 2 additions & 0 deletions plugins/out_es/es.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
struct flb_elasticsearch {
/* Elasticsearch index (database) and type (table) */
char *index;
struct flb_record_accessor *ra_index;

char *type;
char suppress_type_name;

Expand Down
19 changes: 19 additions & 0 deletions plugins/out_es/es_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,20 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
if (f_type) {
ctx->type = flb_strdup(f_type->value); /* FIXME */
}
else {
/* Check if the index has been set in the configuration */
if (ctx->index) {
/* do we have a record accessor pattern ? */
if (strchr(ctx->index, '$')) {
ctx->ra_index = flb_ra_create(ctx->index, FLB_TRUE);
if (!ctx->ra_index) {
flb_plg_error(ctx->ins, "invalid record accessor pattern set for 'index' property");
flb_es_conf_destroy(ctx);
return NULL;
}
}
}
}

/* HTTP Payload (response) maximum buffer size (0 == unlimited) */
if (ctx->buffer_size == -1) {
Expand All @@ -290,6 +304,7 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path);
}


if (ctx->id_key) {
ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE);
if (ctx->ra_id_key == NULL) {
Expand Down Expand Up @@ -498,6 +513,10 @@ int flb_es_conf_destroy(struct flb_elasticsearch *ctx)
flb_ra_destroy(ctx->ra_id_key);
ctx->ra_id_key = NULL;
}
if (ctx->ra_index) {
flb_ra_destroy(ctx->ra_index);
ctx->ra_index = NULL;
}
if (ctx->es_action) {
flb_free(ctx->es_action);
}
Expand Down
1 change: 1 addition & 0 deletions run_code_analysis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ fi
-e INPUT_DEPENDENCIES_DEBIAN="$ADDITIONAL_DEPS" \
-e INPUT_CMAKEFLAGS="$FLB_CMAKE_OPTIONS $SKIP" \
-e INPUT_PRE_COMMAND="cp -R /source /tmp" \
-e INPUT_TEST_COMMAND="${INPUT_TEST_COMMAND}" \
-e INPUT_WORKING-DIRECTORY="/tmp/source" \
lpenz/ghaction-cmake:0.19
58 changes: 58 additions & 0 deletions tests/runtime/out_elasticsearch.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,19 @@ static void cb_check_id_key(void *ctx, int ffd,
flb_free(res_data);
}

static void cb_check_index_ra(void *ctx, int ffd,
int res_ret, void *res_data, size_t res_size,
void *data)
{
char *p;
char *out_js = res_data;
char *index_line = "{\"create\":{\"_index\":\"aaa-JSON_END\",\"_type\":\"_doc\"}";

p = strstr(out_js, index_line);
TEST_CHECK(p != NULL);
flb_free(res_data);
}

void flb_test_write_operation_index()
{
int ret;
Expand Down Expand Up @@ -799,6 +812,50 @@ void flb_test_logstash_prefix_separator()
flb_destroy(ctx);
}

void flb_test_index_ra()
{
int ret;
int size = sizeof(JSON_ES) - 1;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* Create context, flush every second (some checks omitted here) */
ctx = flb_create();
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);

/* Lib input mode */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

/* Elasticsearch output */
out_ffd = flb_output(ctx, (char *) "es", NULL);
flb_output_set(ctx, out_ffd,
"match", "test",
NULL);

/* Override defaults of index and type */
flb_output_set(ctx, out_ffd,
"index", "aaa-$END_KEY",
NULL);

/* Enable test mode */
ret = flb_output_set_test(ctx, out_ffd, "formatter",
cb_check_index_ra,
NULL, NULL);

/* Start */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data sample */
flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

/* Test list */
TEST_LIST = {
{"long_index" , flb_test_long_index },
Expand All @@ -808,6 +865,7 @@ TEST_LIST = {
{"write_operation_update", flb_test_write_operation_update },
{"write_operation_upsert", flb_test_write_operation_upsert },
{"index_type" , flb_test_index_type },
{"index_ra" , flb_test_index_ra},
{"logstash_format" , flb_test_logstash_format },
{"logstash_format_nanos" , flb_test_logstash_format_nanos },
{"tag_key" , flb_test_tag_key },
Expand Down