diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index db2bcee5bd7..42a4b9ebd1a 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -295,6 +295,7 @@ static int elasticsearch_format(struct flb_config *config, size_t off = 0; size_t off_prev = 0; char *es_index; + char index_value[256]; char logstash_index[256]; char time_formatted[256]; char index_formatted[256]; @@ -350,13 +351,15 @@ static int elasticsearch_format(struct flb_config *config, } /* - * If logstash format and id generation are disabled, pre-generate - * the index line for all records. + * If logstash format, index record accessor and id generation are disabled, + * pre-generate the index line for all records. * * The header stored in 'j_index' will be used for the all records on * this payload. */ - if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) { + if (ctx->logstash_format == FLB_FALSE && + ctx->generate_id == FLB_FALSE && ctx->ra_index == NULL) { + flb_time_get(&tms); gmtime_r(&tms.tm.tv_sec, &tm); strftime(index_formatted, sizeof(index_formatted) - 1, @@ -453,6 +456,37 @@ static int elasticsearch_format(struct flb_config *config, msgpack_pack_str_body(&tmp_pck, time_formatted, s); es_index = ctx->index; + if (ctx->ra_index) { + flb_sds_t v = flb_ra_translate(ctx->ra_index, + (char *) tag, tag_len, + map, NULL); + if (v) { + len = flb_sds_len(v); + if (len > 128) { + len = 128; + } + memcpy(index_value, v, len); + index_value[len] = '\0'; + es_index_custom_len = len; + flb_sds_destroy(v); + es_index = index_value; + } + if (ctx->generate_id == FLB_FALSE) { + if (ctx->suppress_type_name) { + index_len = flb_sds_snprintf (&j_index, + flb_sds_alloc (j_index), + ES_BULK_INDEX_FMT_WITHOUT_TYPE, + ctx->es_action, es_index); + } + else { + index_len = flb_sds_snprintf (&j_index, + flb_sds_alloc (j_index), + ES_BULK_INDEX_FMT, + ctx->es_action, + es_index, ctx->type); + } + } + } if (ctx->logstash_format == FLB_TRUE) { ret = compose_index_header(ctx, es_index_custom_len, &logstash_index[0], sizeof(logstash_index), diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index 5d187049f26..1de0dff68a3 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -39,6 +39,8 @@ struct flb_elasticsearch { /* Elasticsearch index (database) and type (table) */ char *index; + struct flb_record_accessor *ra_index; + char *type; char suppress_type_name; diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index 48c8c3e2516..02f4242efb4 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -269,6 +269,20 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, if (f_type) { ctx->type = flb_strdup(f_type->value); /* FIXME */ } + else { + /* Check if the index has been set in the configuration */ + if (ctx->index) { + /* do we have a record accessor pattern ? */ + if (strchr(ctx->index, '$')) { + ctx->ra_index = flb_ra_create(ctx->index, FLB_TRUE); + if (!ctx->ra_index) { + flb_plg_error(ctx->ins, "invalid record accessor pattern set for 'index' property"); + flb_es_conf_destroy(ctx); + return NULL; + } + } + } + } /* HTTP Payload (response) maximum buffer size (0 == unlimited) */ if (ctx->buffer_size == -1) { @@ -290,6 +304,7 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path); } + if (ctx->id_key) { ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE); if (ctx->ra_id_key == NULL) { @@ -498,6 +513,10 @@ int flb_es_conf_destroy(struct flb_elasticsearch *ctx) flb_ra_destroy(ctx->ra_id_key); ctx->ra_id_key = NULL; } + if (ctx->ra_index) { + flb_ra_destroy(ctx->ra_index); + ctx->ra_index = NULL; + } if (ctx->es_action) { flb_free(ctx->es_action); } diff --git a/run_code_analysis.sh b/run_code_analysis.sh index 22adc47f7ef..9d59d9fd1e3 100755 --- a/run_code_analysis.sh +++ b/run_code_analysis.sh @@ -37,5 +37,6 @@ fi -e INPUT_DEPENDENCIES_DEBIAN="$ADDITIONAL_DEPS" \ -e INPUT_CMAKEFLAGS="$FLB_CMAKE_OPTIONS $SKIP" \ -e INPUT_PRE_COMMAND="cp -R /source /tmp" \ + -e INPUT_TEST_COMMAND="${INPUT_TEST_COMMAND}" \ -e INPUT_WORKING-DIRECTORY="/tmp/source" \ lpenz/ghaction-cmake:0.19 diff --git a/tests/runtime/out_elasticsearch.c b/tests/runtime/out_elasticsearch.c index 9efe7610a95..b0cde947551 100644 --- a/tests/runtime/out_elasticsearch.c +++ b/tests/runtime/out_elasticsearch.c @@ -167,6 +167,19 @@ static void cb_check_id_key(void *ctx, int ffd, flb_free(res_data); } +static void cb_check_index_ra(void *ctx, int ffd, + int res_ret, void *res_data, size_t res_size, + void *data) +{ + char *p; + char *out_js = res_data; + char *index_line = "{\"create\":{\"_index\":\"aaa-JSON_END\",\"_type\":\"_doc\"}"; + + p = strstr(out_js, index_line); + TEST_CHECK(p != NULL); + flb_free(res_data); +} + void flb_test_write_operation_index() { int ret; @@ -799,6 +812,50 @@ void flb_test_logstash_prefix_separator() flb_destroy(ctx); } +void flb_test_index_ra() +{ + int ret; + int size = sizeof(JSON_ES) - 1; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "index", "aaa-$END_KEY", + NULL); + + /* Enable test mode */ + ret = flb_output_set_test(ctx, out_ffd, "formatter", + cb_check_index_ra, + NULL, NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + /* Test list */ TEST_LIST = { {"long_index" , flb_test_long_index }, @@ -808,6 +865,7 @@ TEST_LIST = { {"write_operation_update", flb_test_write_operation_update }, {"write_operation_upsert", flb_test_write_operation_upsert }, {"index_type" , flb_test_index_type }, + {"index_ra" , flb_test_index_ra}, {"logstash_format" , flb_test_logstash_format }, {"logstash_format_nanos" , flb_test_logstash_format_nanos }, {"tag_key" , flb_test_tag_key },