diff --git a/tests/internal/stream_processor.c b/tests/internal/stream_processor.c index 64679027862..c9e59f50407 100644 --- a/tests/internal/stream_processor.c +++ b/tests/internal/stream_processor.c @@ -23,10 +23,12 @@ #include #include #include +#include #include #include #include #include +#include #include "flb_tests_internal.h" #include "include/sp_invalid_queries.h" @@ -107,7 +109,7 @@ int flb_sp_do_test(struct flb_sp *sp, struct flb_sp_task *task, if (task->aggregate_keys == FLB_TRUE) { ret = sp_process_data_aggr(data_buf->buffer, data_buf->size, tag, tag_len, - task, sp); + task, sp, FLB_TRUE); if (ret == -1) { flb_error("[sp] error error processing records for '%s'", task->name); @@ -119,8 +121,7 @@ int flb_sp_do_test(struct flb_sp *sp, struct flb_sp_task *task, task->name); return -1; } - - if (task->window.type == FLB_SP_WINDOW_DEFAULT) { + if (task->window.type == FLB_SP_WINDOW_DEFAULT || task->window.type == FLB_SP_WINDOW_TUMBLING) { package_results(tag, tag_len, &out_buf->buffer, &out_buf->size, task); } @@ -751,11 +752,143 @@ static void test_snapshot() #endif } +static void test_conv_from_str_to_num() +{ + struct flb_config *config = NULL; + struct flb_sp *sp = NULL; + struct flb_sp_task *task = NULL; + struct sp_buffer out_buf; + struct sp_buffer data_buf; + msgpack_sbuffer sbuf; + msgpack_packer pck; + msgpack_unpacked result; + size_t off = 0; + char json[4096] = {0}; + int ret; + +#ifdef _WIN32 + WSADATA wsa_data; + + WSAStartup(0x0201, &wsa_data); +#endif + out_buf.buffer = NULL; + + config = flb_config_init(); + config->evl = mk_event_loop_create(256); + + ret = flb_storage_create(config); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("flb_storage_create failed"); + flb_config_exit(config); + return; + } + + sp = flb_sp_create(config); + if (!TEST_CHECK(sp != NULL)) { + TEST_MSG("[sp test] cannot create stream processor context"); + goto test_conv_from_str_to_num_end; + } + + task = flb_sp_task_create(sp, "tail.0", "CREATE STREAM test WITH (tag=\'test\') AS SELECT word, num, COUNT(*) FROM STREAM:tail.0 WINDOW TUMBLING (1 SECOND) GROUP BY word, num;"); + if (!TEST_CHECK(task != NULL)) { + TEST_MSG("[sp test] wrong check 'conv', fix it!"); + goto test_conv_from_str_to_num_end; + } + + /* Create input data */ + msgpack_sbuffer_init(&sbuf); + msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write); + msgpack_pack_array(&pck, 2); + flb_pack_time_now(&pck); + msgpack_pack_map(&pck, 2); + + msgpack_pack_str(&pck, 4); + msgpack_pack_str_body(&pck, "word", 4); + msgpack_pack_str(&pck, 4); + msgpack_pack_str_body(&pck, "hoge", 4); + + msgpack_pack_str(&pck, 3); + msgpack_pack_str_body(&pck, "num", 3); + msgpack_pack_str(&pck, 6); + msgpack_pack_str_body(&pck, "123456", 6); + + data_buf.buffer = sbuf.data; + data_buf.size = sbuf.size; + + out_buf.buffer = NULL; + out_buf.size = 0; + + /* Exec stream processor */ + ret = flb_sp_do_test(sp, task, "tail.0", strlen("tail.0"), &data_buf, &out_buf); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("flb_sp_do_test failed"); + msgpack_sbuffer_destroy(&sbuf); + goto test_conv_from_str_to_num_end; + } + + if (!TEST_CHECK(out_buf.size > 0)) { + TEST_MSG("out_buf size is 0"); + msgpack_sbuffer_destroy(&sbuf); + goto test_conv_from_str_to_num_end; + } + + + /* Check output buffer. It should contain a number 123456 not a string "123456" */ + + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, out_buf.buffer, out_buf.size, &off); + if (!TEST_CHECK(ret == MSGPACK_UNPACK_SUCCESS)) { + TEST_MSG("failed to unpack ret=%d", ret); + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&sbuf); + goto test_conv_from_str_to_num_end; + } + + ret = flb_msgpack_to_json(&json[0], sizeof(json), &result.data); + if (!TEST_CHECK(ret > 0)) { + TEST_MSG("flb_msgpack_to_json failed"); + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&sbuf); + goto test_conv_from_str_to_num_end; + } + + if (!TEST_CHECK(strstr(json,"123456") != NULL)) { + TEST_MSG("number not found"); + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&sbuf); + goto test_conv_from_str_to_num_end; + } + if (!TEST_CHECK(strstr(json,"\"123456\"") == NULL)) { + TEST_MSG("output should be number type"); + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&sbuf); + goto test_conv_from_str_to_num_end; + } + + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&sbuf); + + test_conv_from_str_to_num_end: + if (out_buf.buffer != NULL) { + flb_free(out_buf.buffer); + } + +#ifdef _WIN32 + WSACleanup(); +#endif + if (sp != NULL) { + flb_sp_destroy(sp); + } + flb_storage_destroy(config); + flb_config_exit(config); +} + TEST_LIST = { { "invalid_queries", invalid_queries}, { "select_keys", test_select_keys}, { "select_subkeys", test_select_subkeys}, { "window", test_window}, { "snapshot", test_snapshot}, + { "conv_from_str_to_num", test_conv_from_str_to_num}, { NULL } };