From 729f09deb8f2bf595e02443fe4768f5e4a41eb48 Mon Sep 17 00:00:00 2001 From: juntao-lei-timeplus Date: Fri, 26 Apr 2024 19:11:11 +0800 Subject: [PATCH 1/7] add unit test for streaming aggregator, finished global and window count,append only mode. --- .../tests/gtest_streaming_aggregator.cpp | 389 ++++++++++++++++++ 1 file changed, 389 insertions(+) create mode 100644 src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp diff --git a/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp b/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp new file mode 100644 index 0000000000..7843082ebe --- /dev/null +++ b/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp @@ -0,0 +1,389 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "Columns/ColumnString.h" +#include "Columns/ColumnsNumber.h" +#include "Core/ColumnNumbers.h" +#include "Core/NamesAndTypes.h" +#include "Core/iostream_debug_helpers.h" +#include "DataTypes/DataTypeDateTime.h" +#include "DataTypes/DataTypeDateTime64.h" +#include "DataTypes/DataTypeFixedString.h" +#include "DataTypes/DataTypeInterval.h" +#include "DataTypes/DataTypeString.h" +#include "DataTypes/DataTypesNumber.h" +#include "DataTypes/IDataType.h" +#include "Interpreters/Streaming/TableFunctionDescription.h" +#include "Interpreters/Streaming/TableFunctionDescription_fwd.h" + +namespace DB +{ +using namespace DB; + +void prepareHeader(Block & header) +{ + std::vector columns; + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int8"), fmt::format("int8", 0))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int16"), fmt::format("num16", 1))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int32"), fmt::format("num32", 2))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int64"), fmt::format("num64", 3))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int128"), fmt::format("num128", 4))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int256"), fmt::format("num256", 5))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int8)"), fmt::format("low_int8", 6))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int16)"), fmt::format("low_int16", 7))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int32)"), fmt::format("low_int32", 8))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int64)"), fmt::format("low_int64", 9))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int128)"), fmt::format("low_int128", 10))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int256)"), fmt::format("low_int256", 11))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(string)"), fmt::format("low_str", 12))); + columns.push_back( + ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(fixed_string(3))"), fmt::format("low_fixed_str", 13))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("string"), fmt::format("str", 14))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("fixed_string(3)"), fmt::format("fixed_str", 15))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("nullable(int8)"), fmt::format("nullable", 16))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("datetime"), fmt::format("window_start", 17))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("datetime"), fmt::format("window_end", 18))); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("datetime"), fmt::format("_tp_time", 19))); + for (auto & column : columns) + { + header.insert(column); + } +} + +void prepareAggregates(AggregateDescriptions & aggregates) +{ + tryRegisterAggregateFunctions(); + DataTypes insert_data_types = {std::make_shared()}; + Array param; + DataTypes result_data_types = {std::make_shared()}; + ColumnNumbers arguments = {}; + AggregateFunctionProperties properties = {false, false}; + AggregateDescription aggregate{ + AggregateFunctionFactory::instance().get("count", result_data_types, param, properties, false), param, arguments, {}, "count()"}; + aggregates.push_back(aggregate); +} + +std::shared_ptr prepareParams(size_t max_threads, [[maybe_unused]] std::vector & key_sites) +{ + Block src_header; + prepareHeader(src_header); + ColumnNumbers keys = key_sites; + AggregateDescriptions aggregates; + prepareAggregates(aggregates); + bool overflow_row{false}; + size_t max_rows_to_group_by{0}; + OverflowMode group_by_overflow_mode{OverflowMode::THROW}; + size_t group_by_two_level_threshold{0}; + size_t group_by_two_level_threshold_bytes{0}; + size_t max_bytes_before_external_group_by{0}; + bool empty_result_for_aggregation_by_empty_set{false}; + VolumePtr tmp_volume = nullptr; + size_t min_free_disk_space{0}; + bool compile_aggregate_expressions{true}; + size_t min_count_to_compile_aggregate_expression{3}; + size_t max_block_size{65409}; + Block intermediate_header; + bool keep_state{true}; + size_t streaming_window_count{0}; + Streaming::Aggregator::Params::GroupBy streaming_group_by{Streaming::Aggregator::Params::GroupBy::OTHER}; + ssize_t delta_col_pos{-1}; + size_t window_keys_num{0}; + Streaming::WindowParamsPtr window_params{nullptr}; + Streaming::TrackingUpdatesType tracking_updates_type{Streaming::TrackingUpdatesType::None}; + return make_shared( + src_header, + keys, + aggregates, + overflow_row, + max_rows_to_group_by, + group_by_overflow_mode, + group_by_two_level_threshold, + group_by_two_level_threshold_bytes, + max_bytes_before_external_group_by, + empty_result_for_aggregation_by_empty_set, + tmp_volume, + max_threads, + min_free_disk_space, + compile_aggregate_expressions, + min_count_to_compile_aggregate_expression, + max_block_size, + intermediate_header, + keep_state, + streaming_window_count, + streaming_group_by, + delta_col_pos, + window_keys_num, + window_params, + tracking_updates_type); +} + +template +auto createLowColumn(Args &&... args) +{ + auto int_data_type = std::make_shared(std::forward(args)...); + auto low_cardinality_data_type = std::make_shared(int_data_type); + return low_cardinality_data_type->createColumn(); +}; + +void setColumnsData(Columns & columns, size_t & num_rows, [[maybe_unused]] std::vector & aggr_columns) +{ + auto c_int_8 = ColumnInt8::create(); + auto c_int_16 = ColumnInt16::create(); + auto c_int_32 = ColumnInt32::create(); + auto c_int_64 = ColumnInt64::create(); + auto c_int_128 = ColumnInt128::create(); + auto c_int_256 = ColumnInt256::create(); + auto c_low_int8 = createLowColumn(); + auto c_low_int16 = createLowColumn(); + auto c_low_int32 = createLowColumn(); + auto c_low_int64 = createLowColumn(); + auto c_low_int128 = createLowColumn(); + auto c_low_int256 = createLowColumn(); + auto c_low_str = createLowColumn(); + auto low_fixed_date_type = std::make_shared(std::make_shared(3)); + auto c_low_fixed_str = low_fixed_date_type->createColumn(); + auto c_str = ColumnString::create(); + auto c_fixed_str = ColumnFixedString::create(3); + auto c_nullable = ColumnNullable::create(ColumnInt8::create(), ColumnUInt8::create()); + auto time_date_type = std::make_shared(std::make_shared()); + auto s_time_date_type = std::make_shared(std::make_shared()); + auto e_time_date_type = std::make_shared(std::make_shared()); + auto c_datetime64 = time_date_type->createColumn(); + auto c_time_start = s_time_date_type->createColumn(); + auto c_time_end = e_time_date_type->createColumn(); + for (size_t i = 0; i < 10; ++i) + { + c_int_8->insertValue(8); + c_int_16->insertValue(8); + c_int_32->insertValue(8); + c_int_64->insertValue(8); + c_int_128->insertValue(8); + c_int_256->insertValue(8); + c_low_int8->insert(8); + c_low_int16->insert(8); + c_low_int32->insert(8); + c_low_int64->insert(8); + c_low_int128->insert(8); + c_low_int256->insert(8); + c_low_str->insertData("str", 3); + c_low_fixed_str->insertData("str", 3); + c_str->insertData("str", 3); + c_fixed_str->insertData("str", 3); + c_nullable->insert(8); + c_datetime64->insert(3); + c_time_start->insert(0); + c_time_end->insert(5); + } + num_rows = 10; + columns.push_back(std::move(c_int_8)); + columns.push_back(std::move(c_int_16)); + columns.push_back(std::move(c_int_32)); + columns.push_back(std::move(c_int_64)); + columns.push_back(std::move(c_int_128)); + columns.push_back(std::move(c_int_256)); + columns.push_back(std::move(c_low_int8)); + columns.push_back(std::move(c_low_int16)); + columns.push_back(std::move(c_low_int32)); + columns.push_back(std::move(c_low_int64)); + columns.push_back(std::move(c_low_int128)); + columns.push_back(std::move(c_low_int256)); + columns.push_back(std::move(c_low_str)); + columns.push_back(std::move(c_low_fixed_str)); + columns.push_back(std::move(c_str)); + columns.push_back(std::move(c_fixed_str)); + columns.push_back(std::move(c_nullable)); + columns.push_back(std::move(c_time_start)); + columns.push_back(std::move(c_time_end)); + columns.push_back(std::move(c_datetime64)); +} + +TEST(StreamingAggregation, globalcount) +{ + auto context = getContext().context; + tryRegisterFormats(); + tryRegisterAggregateFunctions(); + size_t max_threads = 10; + + size_t num_rows{0}; + Columns columns; + std::vector aggr_columns; + setColumnsData(columns, num_rows, aggr_columns); + + for (size_t i = 0; i < 18; ++i) + { + std::vector key_sites; + if (i == 17) + { + key_sites.push_back(4); + key_sites.push_back(16); + } + else + { + key_sites.push_back(i); + } + // global aggr + std::shared_ptr params; + params = prepareParams(max_threads, key_sites); + Streaming::Aggregator aggregator(*params); + + ColumnRawPtrs key_columns; + + for (auto & key_site : key_sites) + { + key_columns.push_back(columns[key_site].get()); + } + + Aggregator::AggregateColumns aggregate_columns{aggr_columns}; + + size_t row_begin = 0; + size_t row_end = num_rows; + Streaming::AggregatedDataVariants hash_map; + aggregator.executeOnBlock(columns, row_begin, row_end, hash_map, key_columns, aggregate_columns); + + auto blocks = aggregator.convertToBlocks(hash_map, true, max_threads); + // auto + if (i >= 12 && i <= 15) + EXPECT_EQ(blocks.begin()->getByPosition(0).column->getDataAt(0), "str"); + else + EXPECT_EQ(blocks.begin()->getByPosition(0).column->get64(0), 8); + if (i == 17) + { + EXPECT_EQ(blocks.begin()->getByPosition(1).column->get64(0), 8); + EXPECT_EQ(blocks.begin()->getByPosition(2).column->get64(0), 10); + } + else + EXPECT_EQ(blocks.begin()->getByPosition(1).column->get64(0), 10); + } +} + +TEST(StreamingAggregation, windowcount) +{ + auto context = getContext().context; + tryRegisterFormats(); + tryRegisterAggregateFunctions(); + size_t max_threads = 10; + + size_t num_rows{0}; + Columns columns; + std::vector aggr_columns; + setColumnsData(columns, num_rows, aggr_columns); + + for (size_t i = 0; i < 18; ++i) + { + std::vector key_sites; + key_sites.push_back(17); + if (i != 0) + key_sites.push_back(18); + + if (i == 17) + { + key_sites.push_back(4); + key_sites.push_back(16); + } + else if (i == 3 || i == 4 || i == 9 || i == 10 || i == 16) + { + key_sites.push_back(i); + } + else if (i != 0 && i != 1) + continue; + // window aggr + std::shared_ptr params; + params = prepareParams(max_threads, key_sites); + params->group_by = {Streaming::Aggregator::Params::GroupBy::WINDOW_END}; + params->window_keys_num = 2; + if (i == 0) + params->window_keys_num = 1; + ParserFunction func_parser; + auto ast = parseQuery(func_parser, "tumble(stream, 5s)", 0, 10000); + NamesAndTypesList columns_list; + if (i == 0) + columns_list = {{"window_start", std::make_shared(3, "UTC")}}; + columns_list.push_back({"window_end", std::make_shared(3, "UTC")}); + Streaming::TableFunctionDescriptionPtr table_function_description = std::make_shared( + ast, + Streaming::WindowType::Tumble, + Names{"_tp_time", "5s"}, + DataTypes{std::make_shared(3, "UTC"), std::make_shared(IntervalKind::Second)}, + std::shared_ptr(), + Names{"_tp_time"}, + columns_list); + Streaming::WindowParamsPtr window_params = Streaming::WindowParams::create(table_function_description); + + params->window_params = window_params; + Streaming::Aggregator aggregator(*params); + + ColumnRawPtrs key_columns; + + for (auto & key_site : key_sites) + { + key_columns.push_back(columns[key_site].get()); + } + + Aggregator::AggregateColumns aggregate_columns{aggr_columns}; + + size_t row_begin = 0; + size_t row_end = num_rows; + Streaming::AggregatedDataVariants hash_map; + aggregator.executeOnBlock(columns, row_begin, row_end, hash_map, key_columns, aggregate_columns); + + auto blocks = aggregator.convertToBlocks(hash_map, true, max_threads); + + + if (i == 0) + { + EXPECT_EQ(blocks.begin()->getByPosition(1).column->get64(0), 10); + continue; + } + if (i == 1) + { + EXPECT_EQ(blocks.begin()->getByPosition(2).column->get64(0), 10); + continue; + } + EXPECT_EQ(blocks.begin()->getByPosition(2).column->get64(0), 8); + if (i == 3 || i == 4 || i == 9 || i == 10 || i == 16) + { + EXPECT_EQ(blocks.begin()->getByPosition(3).column->get64(0), 10); + } + if (i == 17) + { + EXPECT_EQ(blocks.begin()->getByPosition(3).column->get64(0), 8); + EXPECT_EQ(blocks.begin()->getByPosition(4).column->get64(0), 10); + } + } +} + +}; \ No newline at end of file From 54f1ebad1135fd22faa16abf3ca31b769e46eaf3 Mon Sep 17 00:00:00 2001 From: juntao-lei-timeplus Date: Sun, 28 Apr 2024 16:09:51 +0800 Subject: [PATCH 2/7] Refactoring code --- .../tests/gtest_streaming_aggregator.cpp | 414 +++++++++--------- 1 file changed, 216 insertions(+), 198 deletions(-) diff --git a/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp b/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp index 7843082ebe..b79a08dc8c 100644 --- a/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp +++ b/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp @@ -1,87 +1,42 @@ -#include -#include -#include -#include -#include #include -#include -#include -#include #include +#include #include -#include -#include -#include -#include -#include -#include +#include #include -#include -#include -#include -#include -#include -#include -#include -#include #include -#include #include -#include -#include -#include -#include #include -#include #include #include -#include "Columns/ColumnString.h" -#include "Columns/ColumnsNumber.h" -#include "Core/ColumnNumbers.h" -#include "Core/NamesAndTypes.h" -#include "Core/iostream_debug_helpers.h" -#include "DataTypes/DataTypeDateTime.h" -#include "DataTypes/DataTypeDateTime64.h" -#include "DataTypes/DataTypeFixedString.h" #include "DataTypes/DataTypeInterval.h" -#include "DataTypes/DataTypeString.h" -#include "DataTypes/DataTypesNumber.h" -#include "DataTypes/IDataType.h" #include "Interpreters/Streaming/TableFunctionDescription.h" -#include "Interpreters/Streaming/TableFunctionDescription_fwd.h" - namespace DB { -using namespace DB; - -void prepareHeader(Block & header) +Block prepareHeader() { std::vector columns; - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int8"), fmt::format("int8", 0))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int16"), fmt::format("num16", 1))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int32"), fmt::format("num32", 2))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int64"), fmt::format("num64", 3))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int128"), fmt::format("num128", 4))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int256"), fmt::format("num256", 5))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int8)"), fmt::format("low_int8", 6))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int16)"), fmt::format("low_int16", 7))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int32)"), fmt::format("low_int32", 8))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int64)"), fmt::format("low_int64", 9))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int128)"), fmt::format("low_int128", 10))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int256)"), fmt::format("low_int256", 11))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(string)"), fmt::format("low_str", 12))); - columns.push_back( - ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(fixed_string(3))"), fmt::format("low_fixed_str", 13))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("string"), fmt::format("str", 14))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("fixed_string(3)"), fmt::format("fixed_str", 15))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("nullable(int8)"), fmt::format("nullable", 16))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("datetime"), fmt::format("window_start", 17))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("datetime"), fmt::format("window_end", 18))); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("datetime"), fmt::format("_tp_time", 19))); - for (auto & column : columns) - { - header.insert(column); - } + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int8"), "int8")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int16"), "int16")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int32"), "int32")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int64"), "int64")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int128"), "int128")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int256"), "int256")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int8)"), "low_int8")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int16)"), "low_int16")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int32)"), "low_int32")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int64)"), "low_int64")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int128)"), "low_int128")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(int256)"), "low_int256")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(string)"), "low_str")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("low_cardinality(fixed_string(3))"), "low_fixed_str")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("string"), "str")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("fixed_string(3)"), "fixed_str")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("nullable(int8)"), "nullable")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("datetime"), "window_start")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("datetime"), "window_end")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("datetime"), "_tp_time")); + return Block(std::move(columns)); } void prepareAggregates(AggregateDescriptions & aggregates) @@ -97,11 +52,10 @@ void prepareAggregates(AggregateDescriptions & aggregates) aggregates.push_back(aggregate); } -std::shared_ptr prepareParams(size_t max_threads, [[maybe_unused]] std::vector & key_sites) +std::shared_ptr prepareParams(size_t max_threads, [[maybe_unused]] std::vector & key_site) { - Block src_header; - prepareHeader(src_header); - ColumnNumbers keys = key_sites; + Block src_header = prepareHeader(); + ColumnNumbers keys = key_site; AggregateDescriptions aggregates; prepareAggregates(aggregates); bool overflow_row{false}; @@ -159,7 +113,7 @@ auto createLowColumn(Args &&... args) return low_cardinality_data_type->createColumn(); }; -void setColumnsData(Columns & columns, size_t & num_rows, [[maybe_unused]] std::vector & aggr_columns) +void setColumnsData(Columns & columns, [[maybe_unused]] std::vector & aggr_columns) { auto c_int_8 = ColumnInt8::create(); auto c_int_16 = ColumnInt16::create(); @@ -208,7 +162,6 @@ void setColumnsData(Columns & columns, size_t & num_rows, [[maybe_unused]] std:: c_time_start->insert(0); c_time_end->insert(5); } - num_rows = 10; columns.push_back(std::move(c_int_8)); columns.push_back(std::move(c_int_16)); columns.push_back(std::move(c_int_32)); @@ -231,158 +184,223 @@ void setColumnsData(Columns & columns, size_t & num_rows, [[maybe_unused]] std:: columns.push_back(std::move(c_datetime64)); } -TEST(StreamingAggregation, globalcount) +void initAggregation() { - auto context = getContext().context; tryRegisterFormats(); tryRegisterAggregateFunctions(); - size_t max_threads = 10; +} + +std::vector> prepareGlobalCountKeys() +{ + std::vector> key_sites{ + {0}, + {1}, + {2}, + {3}, + {4}, + {5}, + {6}, + {7}, + {8}, + {9}, + {10}, + {11}, + {12}, + {13}, + {14}, + {15}, + {16}, + {4, 16}, + }; + return key_sites; +} - size_t num_rows{0}; - Columns columns; - std::vector aggr_columns; - setColumnsData(columns, num_rows, aggr_columns); +std::vector> prepareWindowCountKeys() +{ + std::vector> key_sites{ + {17}, + {17, 18}, + {17, 18, 3}, + {17, 18, 4}, + {17, 18, 9}, + {17, 18, 10}, + {17, 18, 16}, + {17, 18, 4, 16}, + }; + return key_sites; +} + +using ResultType = std::variant; + +std::vector>> prepareGlobalResult() +{ + std::vector>> result{ + {{0, size_t(8)}, {1, size_t(10)}}, + {{0, size_t(8)}, {1, size_t(10)}}, + {{0, size_t(8)}, {1, size_t(10)}}, + {{0, size_t(8)}, {1, size_t(10)}}, + {{0, size_t(8)}, {1, size_t(10)}}, + {{0, size_t(8)}, {1, size_t(10)}}, + {{0, size_t(8)}, {1, size_t(10)}}, + {{0, size_t(8)}, {1, size_t(10)}}, + {{0, size_t(8)}, {1, size_t(10)}}, + {{0, size_t(8)}, {1, size_t(10)}}, + {{0, size_t(8)}, {1, size_t(10)}}, + {{0, size_t(8)}, {1, size_t(10)}}, + {{0, std::string("str")}, {1, size_t(10)}}, + {{0, std::string("str")}, {1, size_t(10)}}, + {{0, std::string("str")}, {1, size_t(10)}}, + {{0, std::string("str")}, {1, size_t(10)}}, + {{0, size_t(8)}, {1, size_t(10)}}, + {{0, size_t(8)}, {1, size_t(8)}, {2, size_t(10)}}, + }; + return result; +}; + +std::vector>> prepareWindowResult() +{ + std::vector>> result{ + {{1, size_t(10)}}, + {{2, size_t(10)}}, + {{2, size_t(8)}, {3, size_t(10)}}, + {{2, size_t(8)}, {3, size_t(10)}}, + {{2, size_t(8)}, {3, size_t(10)}}, + {{2, size_t(8)}, {3, size_t(10)}}, + {{2, size_t(8)}, {3, size_t(10)}}, + {{2, size_t(8)}, {3, size_t(8)}, {4, size_t(10)}}, + }; + return result; +}; - for (size_t i = 0; i < 18; ++i) +void checkGlobalAggregationResult(BlocksList & blocks, std::vector>> & results, size_t position) +{ + EXPECT_EQ(blocks.size(), 1); + auto & result = results[position]; + for (auto & [site, value] : result) { - std::vector key_sites; - if (i == 17) + if (std::holds_alternative(value)) { - key_sites.push_back(4); - key_sites.push_back(16); + EXPECT_EQ(blocks.begin()->getByPosition(site).column->get64(0), std::get(value)); } else { - key_sites.push_back(i); + EXPECT_EQ(blocks.begin()->getByPosition(site).column->getDataAt(0), std::get(value)); } - // global aggr - std::shared_ptr params; - params = prepareParams(max_threads, key_sites); - Streaming::Aggregator aggregator(*params); + } +} - ColumnRawPtrs key_columns; +void checkWindowAggregationResult(BlocksList & blocks, std::vector>> & results, size_t position) +{ + EXPECT_EQ(blocks.size(), 1); + auto & result = results[position]; - for (auto & key_site : key_sites) + for (auto & [site, value] : result) + { + if (std::holds_alternative(value)) { - key_columns.push_back(columns[key_site].get()); + EXPECT_EQ(blocks.begin()->getByPosition(site).column->get64(0), std::get(value)); } - - Aggregator::AggregateColumns aggregate_columns{aggr_columns}; - - size_t row_begin = 0; - size_t row_end = num_rows; - Streaming::AggregatedDataVariants hash_map; - aggregator.executeOnBlock(columns, row_begin, row_end, hash_map, key_columns, aggregate_columns); - - auto blocks = aggregator.convertToBlocks(hash_map, true, max_threads); - // auto - if (i >= 12 && i <= 15) - EXPECT_EQ(blocks.begin()->getByPosition(0).column->getDataAt(0), "str"); else - EXPECT_EQ(blocks.begin()->getByPosition(0).column->get64(0), 8); - if (i == 17) { - EXPECT_EQ(blocks.begin()->getByPosition(1).column->get64(0), 8); - EXPECT_EQ(blocks.begin()->getByPosition(2).column->get64(0), 10); + EXPECT_EQ(blocks.begin()->getByPosition(site).column->getDataAt(0), std::get(value)); } - else - EXPECT_EQ(blocks.begin()->getByPosition(1).column->get64(0), 10); } } -TEST(StreamingAggregation, windowcount) +std::shared_ptr prepareGlobalAggregator( + size_t max_threads, + std::vector & key_site, + ColumnRawPtrs & key_columns, + [[maybe_unused]] ColumnRawPtrs & aggr_columns, + Columns & columns) { - auto context = getContext().context; - tryRegisterFormats(); - tryRegisterAggregateFunctions(); - size_t max_threads = 10; + setColumnsData(columns, aggr_columns); + std::shared_ptr params = prepareParams(max_threads, key_site); + Streaming::Aggregator aggregator(*params); + for (auto & site : key_site) + { + key_columns.push_back(columns[site].get()); + } + return params; +} - size_t num_rows{0}; - Columns columns; - std::vector aggr_columns; - setColumnsData(columns, num_rows, aggr_columns); +std::shared_ptr prepareWindowAggregator( + size_t max_threads, + std::vector & key_site, + ColumnRawPtrs & key_columns, + [[maybe_unused]] ColumnRawPtrs & aggr_columns, + Columns & columns) +{ + setColumnsData(columns, aggr_columns); + std::shared_ptr params = prepareParams(max_threads, key_site); + params->group_by = {Streaming::Aggregator::Params::GroupBy::WINDOW_END}; + params->window_keys_num = 2; + if (key_site.size() == 1) + params->window_keys_num = 1; + ParserFunction func_parser; + auto ast = parseQuery(func_parser, "tumble(stream, 5s)", 0, 10000); + NamesAndTypesList columns_list; + if (key_site.size() == 1) + columns_list = {{"window_start", std::make_shared(3, "UTC")}}; + columns_list.push_back({"window_end", std::make_shared(3, "UTC")}); + Streaming::TableFunctionDescriptionPtr table_function_description = std::make_shared( + ast, + Streaming::WindowType::Tumble, + Names{"_tp_time", "5s"}, + DataTypes{std::make_shared(3, "UTC"), std::make_shared(IntervalKind::Second)}, + std::shared_ptr(), + Names{"_tp_time"}, + columns_list); + Streaming::WindowParamsPtr window_params = Streaming::WindowParams::create(table_function_description); - for (size_t i = 0; i < 18; ++i) + params->window_params = window_params; + Streaming::Aggregator aggregator(*params); + for (auto & site : key_site) { - std::vector key_sites; - key_sites.push_back(17); - if (i != 0) - key_sites.push_back(18); - - if (i == 17) - { - key_sites.push_back(4); - key_sites.push_back(16); - } - else if (i == 3 || i == 4 || i == 9 || i == 10 || i == 16) - { - key_sites.push_back(i); - } - else if (i != 0 && i != 1) - continue; - // window aggr - std::shared_ptr params; - params = prepareParams(max_threads, key_sites); - params->group_by = {Streaming::Aggregator::Params::GroupBy::WINDOW_END}; - params->window_keys_num = 2; - if (i == 0) - params->window_keys_num = 1; - ParserFunction func_parser; - auto ast = parseQuery(func_parser, "tumble(stream, 5s)", 0, 10000); - NamesAndTypesList columns_list; - if (i == 0) - columns_list = {{"window_start", std::make_shared(3, "UTC")}}; - columns_list.push_back({"window_end", std::make_shared(3, "UTC")}); - Streaming::TableFunctionDescriptionPtr table_function_description = std::make_shared( - ast, - Streaming::WindowType::Tumble, - Names{"_tp_time", "5s"}, - DataTypes{std::make_shared(3, "UTC"), std::make_shared(IntervalKind::Second)}, - std::shared_ptr(), - Names{"_tp_time"}, - columns_list); - Streaming::WindowParamsPtr window_params = Streaming::WindowParams::create(table_function_description); + key_columns.push_back(columns[site].get()); + } + return params; +} - params->window_params = window_params; +TEST(StreamingAggregation, globalcount) +{ + initAggregation(); + auto key_sites = prepareGlobalCountKeys(); + auto results = prepareGlobalResult(); + size_t position = 0; + for (auto & key_site : key_sites) + { + Columns columns; + ColumnRawPtrs key_columns, aggregate_columns; + Streaming::AggregatedDataVariants hash_map; + std::shared_ptr params + = prepareGlobalAggregator(10, key_site, key_columns, aggregate_columns, columns); Streaming::Aggregator aggregator(*params); + Aggregator::AggregateColumns aggregate_column{aggregate_columns}; + aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); + auto blocks = aggregator.convertToBlocks(hash_map, true, 10); + checkGlobalAggregationResult(blocks, results, position++); + } +} - ColumnRawPtrs key_columns; - - for (auto & key_site : key_sites) - { - key_columns.push_back(columns[key_site].get()); - } - - Aggregator::AggregateColumns aggregate_columns{aggr_columns}; - - size_t row_begin = 0; - size_t row_end = num_rows; +TEST(StreamingAggregation, windowcount) +{ + initAggregation(); + auto key_sites = prepareWindowCountKeys(); + auto results = prepareWindowResult(); + size_t position = 0; + for (auto & key_site : key_sites) + { + Columns columns; + ColumnRawPtrs key_columns, aggregate_columns; Streaming::AggregatedDataVariants hash_map; - aggregator.executeOnBlock(columns, row_begin, row_end, hash_map, key_columns, aggregate_columns); - - auto blocks = aggregator.convertToBlocks(hash_map, true, max_threads); - - - if (i == 0) - { - EXPECT_EQ(blocks.begin()->getByPosition(1).column->get64(0), 10); - continue; - } - if (i == 1) - { - EXPECT_EQ(blocks.begin()->getByPosition(2).column->get64(0), 10); - continue; - } - EXPECT_EQ(blocks.begin()->getByPosition(2).column->get64(0), 8); - if (i == 3 || i == 4 || i == 9 || i == 10 || i == 16) - { - EXPECT_EQ(blocks.begin()->getByPosition(3).column->get64(0), 10); - } - if (i == 17) - { - EXPECT_EQ(blocks.begin()->getByPosition(3).column->get64(0), 8); - EXPECT_EQ(blocks.begin()->getByPosition(4).column->get64(0), 10); - } + std::shared_ptr params + = prepareWindowAggregator(10, key_site, key_columns, aggregate_columns, columns); + Streaming::Aggregator aggregator(*params); + Aggregator::AggregateColumns aggregate_column{aggregate_columns}; + aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); + auto blocks = aggregator.convertToBlocks(hash_map, true, 10); + checkWindowAggregationResult(blocks, results, position++); } } From df714d99785319655f2afa96af567455cb6a29ab Mon Sep 17 00:00:00 2001 From: juntao-lei-timeplus Date: Thu, 9 May 2024 15:11:09 +0800 Subject: [PATCH 3/7] cover changelog input,changelog,on uedate output cases. --- .../tests/gtest_streaming_aggregator.cpp | 542 ++++++++++++++---- 1 file changed, 443 insertions(+), 99 deletions(-) diff --git a/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp b/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp index b79a08dc8c..6b3c4ef376 100644 --- a/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp +++ b/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp @@ -11,6 +11,16 @@ #include #include "DataTypes/DataTypeInterval.h" #include "Interpreters/Streaming/TableFunctionDescription.h" + +class InputData +{ +public: + size_t int_data; + std::string str_data; + size_t time_data; + int delta_data; +}; + namespace DB { Block prepareHeader() @@ -33,31 +43,44 @@ Block prepareHeader() columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("string"), "str")); columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("fixed_string(3)"), "fixed_str")); columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("nullable(int8)"), "nullable")); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("datetime"), "window_start")); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("datetime"), "window_end")); - columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("datetime"), "_tp_time")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("datetime64(3)"), "window_start")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("datetime64(3)"), "window_end")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("datetime64(3)"), "_tp_time")); + columns.push_back(ColumnWithTypeAndName(DataTypeFactory::instance().get("int8"), "_tp_delta")); return Block(std::move(columns)); } -void prepareAggregates(AggregateDescriptions & aggregates) +void prepareAggregates(AggregateDescriptions & aggregates, bool is_changelog_input) { tryRegisterAggregateFunctions(); DataTypes insert_data_types = {std::make_shared()}; Array param; - DataTypes result_data_types = {std::make_shared()}; - ColumnNumbers arguments = {}; + DataTypes result_data_types = {std::make_shared()}; + ColumnNumbers arguments{}; AggregateFunctionProperties properties = {false, false}; + std::string name; + if (is_changelog_input) + { + name = "_tp_delta"; + arguments.push_back(20); + } + std::string function_name = is_changelog_input ? "__count_retract" : "count"; AggregateDescription aggregate{ - AggregateFunctionFactory::instance().get("count", result_data_types, param, properties, false), param, arguments, {}, "count()"}; + AggregateFunctionFactory::instance().get(function_name, result_data_types, param, properties, is_changelog_input), + param, + arguments, + {name}, + "count()"}; aggregates.push_back(aggregate); } -std::shared_ptr prepareParams(size_t max_threads, [[maybe_unused]] std::vector & key_site) +std::shared_ptr +prepareParams(size_t max_threads, bool is_changelog_input, [[maybe_unused]] std::vector & key_site) { Block src_header = prepareHeader(); ColumnNumbers keys = key_site; AggregateDescriptions aggregates; - prepareAggregates(aggregates); + prepareAggregates(aggregates, is_changelog_input); bool overflow_row{false}; size_t max_rows_to_group_by{0}; OverflowMode group_by_overflow_mode{OverflowMode::THROW}; @@ -74,7 +97,7 @@ std::shared_ptr prepareParams(size_t max_threads, bool keep_state{true}; size_t streaming_window_count{0}; Streaming::Aggregator::Params::GroupBy streaming_group_by{Streaming::Aggregator::Params::GroupBy::OTHER}; - ssize_t delta_col_pos{-1}; + ssize_t delta_col_pos{is_changelog_input ? 20 : -1}; size_t window_keys_num{0}; Streaming::WindowParamsPtr window_params{nullptr}; Streaming::TrackingUpdatesType tracking_updates_type{Streaming::TrackingUpdatesType::None}; @@ -113,8 +136,9 @@ auto createLowColumn(Args &&... args) return low_cardinality_data_type->createColumn(); }; -void setColumnsData(Columns & columns, [[maybe_unused]] std::vector & aggr_columns) +void setColumnsData(Columns & columns, InputData input_data, [[maybe_unused]] std::vector & aggr_columns) { + columns.clear(); auto c_int_8 = ColumnInt8::create(); auto c_int_16 = ColumnInt16::create(); auto c_int_32 = ColumnInt32::create(); @@ -133,34 +157,39 @@ void setColumnsData(Columns & columns, [[maybe_unused]] std::vector(std::make_shared()); - auto s_time_date_type = std::make_shared(std::make_shared()); - auto e_time_date_type = std::make_shared(std::make_shared()); + auto time_date_type = std::make_shared(3); + auto s_time_date_type = std::make_shared(3); + auto e_time_date_type = std::make_shared(3); auto c_datetime64 = time_date_type->createColumn(); auto c_time_start = s_time_date_type->createColumn(); auto c_time_end = e_time_date_type->createColumn(); + auto * c_datetime64_inner = typeid_cast *>(c_datetime64.get()); + auto * c_time_start_inner = typeid_cast *>(c_time_start.get()); + auto * c_time_end_inner = typeid_cast *>(c_time_end.get()); + auto c_delta = ColumnInt8::create(); for (size_t i = 0; i < 10; ++i) { - c_int_8->insertValue(8); - c_int_16->insertValue(8); - c_int_32->insertValue(8); - c_int_64->insertValue(8); - c_int_128->insertValue(8); - c_int_256->insertValue(8); - c_low_int8->insert(8); - c_low_int16->insert(8); - c_low_int32->insert(8); - c_low_int64->insert(8); - c_low_int128->insert(8); - c_low_int256->insert(8); - c_low_str->insertData("str", 3); - c_low_fixed_str->insertData("str", 3); - c_str->insertData("str", 3); - c_fixed_str->insertData("str", 3); - c_nullable->insert(8); - c_datetime64->insert(3); - c_time_start->insert(0); - c_time_end->insert(5); + c_int_8->insertValue(input_data.int_data); + c_int_16->insertValue(input_data.int_data); + c_int_32->insertValue(input_data.int_data); + c_int_64->insertValue(input_data.int_data); + c_int_128->insertValue(input_data.int_data); + c_int_256->insertValue(input_data.int_data); + c_low_int8->insert(input_data.int_data); + c_low_int16->insert(input_data.int_data); + c_low_int32->insert(input_data.int_data); + c_low_int64->insert(input_data.int_data); + c_low_int128->insert(input_data.int_data); + c_low_int256->insert(input_data.int_data); + c_low_str->insertData(input_data.str_data.c_str(), 3); + c_low_fixed_str->insertData(input_data.str_data.c_str(), 3); + c_str->insertData(input_data.str_data.c_str(), 3); + c_fixed_str->insertData(input_data.str_data.c_str(), 3); + c_nullable->insert(input_data.int_data); + c_datetime64_inner->insertValue(input_data.time_data); + c_time_start_inner->insertValue(input_data.time_data / 5 * 5); + c_time_end_inner->insertValue((input_data.time_data / 5 + 1) * 5); + c_delta->insert(input_data.delta_data); } columns.push_back(std::move(c_int_8)); columns.push_back(std::move(c_int_16)); @@ -182,6 +211,7 @@ void setColumnsData(Columns & columns, [[maybe_unused]] std::vector> prepareWindowCountKeys() return key_sites; } -using ResultType = std::variant; +using ResultType = std::variant; + +std::vector>> prepareGlobalAppendonlyAppendonlyResult() +{ + std::vector>> result{ + {{0, 0, Int64(8)}, {0, 1, Int64(9)}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, Int64(8)}, {0, 1, Int64(9)}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, Int64(8)}, {0, 1, Int64(9)}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, Int64(8)}, {0, 1, Int64(9)}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, Int64(8)}, {0, 1, Int64(9)}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, Int64(8)}, {0, 1, Int64(9)}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, Int64(8)}, {0, 1, Int64(9)}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, Int64(8)}, {0, 1, Int64(9)}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, Int64(8)}, {0, 1, Int64(9)}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, Int64(8)}, {0, 1, Int64(9)}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, Int64(8)}, {0, 1, Int64(9)}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, Int64(8)}, {0, 1, Int64(9)}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, std::string("str")}, {0, 1, std::string("sts")}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, std::string("str")}, {0, 1, std::string("sts")}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, std::string("str")}, {0, 1, std::string("sts")}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, std::string("str")}, {0, 1, std::string("sts")}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, Int8(8)}, {0, 1, Int8(9)}, {1, 0, Int64(10)}, {1, 1, Int64(10)}}, + {{0, 0, Int64(8)}, {0, 1, Int64(9)}, {1, 0, Int8(8)}, {1, 1, Int8(9)}, {2, 0, Int64(10)}, {2, 1, Int64(10)}}, + }; + return result; +}; + +std::vector>> prepareGlobalChangelogAppendonlyResult() +{ + std::vector>> result{ + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, std::string("str")}, {1, 0, Int64(0)}}, + {{0, 0, std::string("str")}, {1, 0, Int64(0)}}, + {{0, 0, std::string("str")}, {1, 0, Int64(0)}}, + {{0, 0, std::string("str")}, {1, 0, Int64(0)}}, + {{0, 0, Int8(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int8(8)}, {2, 0, Int64(0)}}, + }; + return result; +}; -std::vector>> prepareGlobalResult() +std::vector>> prepareGlobalAppendonlyUpdateResult() { - std::vector>> result{ - {{0, size_t(8)}, {1, size_t(10)}}, - {{0, size_t(8)}, {1, size_t(10)}}, - {{0, size_t(8)}, {1, size_t(10)}}, - {{0, size_t(8)}, {1, size_t(10)}}, - {{0, size_t(8)}, {1, size_t(10)}}, - {{0, size_t(8)}, {1, size_t(10)}}, - {{0, size_t(8)}, {1, size_t(10)}}, - {{0, size_t(8)}, {1, size_t(10)}}, - {{0, size_t(8)}, {1, size_t(10)}}, - {{0, size_t(8)}, {1, size_t(10)}}, - {{0, size_t(8)}, {1, size_t(10)}}, - {{0, size_t(8)}, {1, size_t(10)}}, - {{0, std::string("str")}, {1, size_t(10)}}, - {{0, std::string("str")}, {1, size_t(10)}}, - {{0, std::string("str")}, {1, size_t(10)}}, - {{0, std::string("str")}, {1, size_t(10)}}, - {{0, size_t(8)}, {1, size_t(10)}}, - {{0, size_t(8)}, {1, size_t(8)}, {2, size_t(10)}}, + std::vector>> result{ + {{0, 0, Int64(9)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(9)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(9)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(9)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(9)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(9)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(9)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(9)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(9)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(9)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(9)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(9)}, {1, 0, Int64(10)}}, + {{0, 0, std::string("sts")}, {1, 0, Int64(10)}}, + {{0, 0, std::string("sts")}, {1, 0, Int64(10)}}, + {{0, 0, std::string("sts")}, {1, 0, Int64(10)}}, + {{0, 0, std::string("sts")}, {1, 0, Int64(10)}}, + {{0, 0, Int8(9)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(9)}, {1, 0, Int8(9)}, {2, 0, Int64(10)}}, }; return result; }; -std::vector>> prepareWindowResult() +std::vector>> prepareGlobalChangelogUpdateResult() { - std::vector>> result{ - {{1, size_t(10)}}, - {{2, size_t(10)}}, - {{2, size_t(8)}, {3, size_t(10)}}, - {{2, size_t(8)}, {3, size_t(10)}}, - {{2, size_t(8)}, {3, size_t(10)}}, - {{2, size_t(8)}, {3, size_t(10)}}, - {{2, size_t(8)}, {3, size_t(10)}}, - {{2, size_t(8)}, {3, size_t(8)}, {4, size_t(10)}}, + std::vector>> result{ + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(0)}}, + {{0, 0, std::string("str")}, {1, 0, Int64(0)}}, + {{0, 0, std::string("str")}, {1, 0, Int64(0)}}, + {{0, 0, std::string("str")}, {1, 0, Int64(0)}}, + {{0, 0, std::string("str")}, {1, 0, Int64(0)}}, + {{0, 0, Int8(8)}, {1, 0, Int64(0)}}, + {{0, 0, Int64(8)}, {1, 0, Int8(8)}, {2, 0, Int64(0)}}, }; return result; }; -void checkGlobalAggregationResult(BlocksList & blocks, std::vector>> & results, size_t position) +std::vector>> prepareGlobalAppendonlyChangelogResult() +{ + std::vector>> result{ + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, std::string("str")}, {1, 0, Int64(10)}}, + {{0, 0, std::string("str")}, {1, 0, Int64(10)}}, + {{0, 0, std::string("str")}, {1, 0, Int64(10)}}, + {{0, 0, std::string("str")}, {1, 0, Int64(10)}}, + {{0, 0, Int8(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int8(8)}, {2, 0, Int64(10)}}, + }; + return result; +}; + +std::vector>> prepareGlobalChangelogChangelogResult() +{ + std::vector>> result{ + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int64(10)}}, + {{0, 0, std::string("str")}, {1, 0, Int64(10)}}, + {{0, 0, std::string("str")}, {1, 0, Int64(10)}}, + {{0, 0, std::string("str")}, {1, 0, Int64(10)}}, + {{0, 0, std::string("str")}, {1, 0, Int64(10)}}, + {{0, 0, Int8(8)}, {1, 0, Int64(10)}}, + {{0, 0, Int64(8)}, {1, 0, Int8(8)}, {2, 0, Int64(10)}}, + }; + return result; +}; + +std::vector>> prepareWindowAppendonlyAppendonlyResult() +{ + std::vector>> result{ + {{1, 0, Int64(20)}}, + {{2, 0, Int64(20)}}, + {{2, 0, Int64(8)}, {3, 0, Int64(20)}}, + {{2, 0, Int64(8)}, {3, 0, Int64(20)}}, + {{2, 0, Int64(8)}, {3, 0, Int64(20)}}, + {{2, 0, Int64(8)}, {3, 0, Int64(20)}}, + {{2, 0, Int8(8)}, {3, 0, Int64(20)}}, + {{2, 0, Int64(8)}, {3, 0, Int8(8)}, {4, 0, Int64(20)}}, + }; + return result; +}; + +std::vector>> prepareWindowAppendonlyUpdateResult() +{ + std::vector>> result{ + {{1, 0, Int64(20)}}, + {{2, 0, Int64(20)}}, + {{2, 0, Int64(8)}, {3, 0, Int64(20)}}, + {{2, 0, Int64(8)}, {3, 0, Int64(20)}}, + {{2, 0, Int64(8)}, {3, 0, Int64(20)}}, + {{2, 0, Int64(8)}, {3, 0, Int64(20)}}, + {{2, 0, Int8(8)}, {3, 0, Int64(20)}}, + {{2, 0, Int64(8)}, {3, 0, Int8(8)}, {4, 0, Int64(20)}}, + }; + return result; +}; + +void checkAggregationResults( + BlocksList & blocks, std::vector>> & results, size_t position) { - EXPECT_EQ(blocks.size(), 1); auto & result = results[position]; - for (auto & [site, value] : result) + std::map result_map, expected_map; + for (auto & [column, raw, value] : result) { - if (std::holds_alternative(value)) + if (std::holds_alternative(value)) + { + expected_map[value]++; + result_map[static_cast(blocks.begin()->getByPosition(column).column->get64(raw))]++; + } + else if (std::holds_alternative(value)) { - EXPECT_EQ(blocks.begin()->getByPosition(site).column->get64(0), std::get(value)); + expected_map[value]++; + result_map[blocks.begin()->getByPosition(column).column->getInt(raw)]++; } else { - EXPECT_EQ(blocks.begin()->getByPosition(site).column->getDataAt(0), std::get(value)); + expected_map[value]++; + result_map[blocks.begin()->getByPosition(column).column->getDataAt(raw).toString()]++; } } + EXPECT_EQ(result_map, expected_map); } -void checkWindowAggregationResult(BlocksList & blocks, std::vector>> & results, size_t position) +void checkAggregationResult(Block & block, std::vector>> & results, size_t position) { - EXPECT_EQ(blocks.size(), 1); auto & result = results[position]; - - for (auto & [site, value] : result) + std::map result_map, expected_map; + for (auto & [column, raw, value] : result) { - if (std::holds_alternative(value)) + if (std::holds_alternative(value)) { - EXPECT_EQ(blocks.begin()->getByPosition(site).column->get64(0), std::get(value)); + expected_map[value]++; + result_map[static_cast(block.getByPosition(column).column->get64(raw))]++; + } + else if (std::holds_alternative(value)) + { + expected_map[value]++; + result_map[block.getByPosition(column).column->getInt(raw)]++; } else { - EXPECT_EQ(blocks.begin()->getByPosition(site).column->getDataAt(0), std::get(value)); + expected_map[value]++; + result_map[block.getByPosition(column).column->getDataAt(raw).toString()]++; } } + EXPECT_EQ(result_map, expected_map); } std::shared_ptr prepareGlobalAggregator( @@ -312,14 +498,14 @@ std::shared_ptr prepareGlobalAggregator( std::vector & key_site, ColumnRawPtrs & key_columns, [[maybe_unused]] ColumnRawPtrs & aggr_columns, - Columns & columns) + Columns & columns, + bool is_changelog_input = false) { - setColumnsData(columns, aggr_columns); - std::shared_ptr params = prepareParams(max_threads, key_site); - Streaming::Aggregator aggregator(*params); - for (auto & site : key_site) + setColumnsData(columns, {8, "str", 1, 1}, aggr_columns); + std::shared_ptr params = prepareParams(max_threads, is_changelog_input, key_site); + for (size_t i = 0; i < key_site.size(); ++i) { - key_columns.push_back(columns[site].get()); + key_columns.push_back(nullptr); } return params; } @@ -329,10 +515,11 @@ std::shared_ptr prepareWindowAggregator( std::vector & key_site, ColumnRawPtrs & key_columns, [[maybe_unused]] ColumnRawPtrs & aggr_columns, - Columns & columns) + Columns & columns, + bool is_changelog_input = false) { - setColumnsData(columns, aggr_columns); - std::shared_ptr params = prepareParams(max_threads, key_site); + setColumnsData(columns, {8, "str", 1, 1}, aggr_columns); + std::shared_ptr params = prepareParams(max_threads, is_changelog_input, key_site); params->group_by = {Streaming::Aggregator::Params::GroupBy::WINDOW_END}; params->window_keys_num = 2; if (key_site.size() == 1) @@ -355,22 +542,22 @@ std::shared_ptr prepareWindowAggregator( params->window_params = window_params; Streaming::Aggregator aggregator(*params); - for (auto & site : key_site) + for (size_t i = 0; i < key_site.size(); ++i) { - key_columns.push_back(columns[site].get()); + key_columns.push_back(nullptr); } return params; } -TEST(StreamingAggregation, globalcount) +TEST(StreamingAggregation, GlobalAppendOnlyAppendOnly) { initAggregation(); auto key_sites = prepareGlobalCountKeys(); - auto results = prepareGlobalResult(); + auto results = prepareGlobalAppendonlyAppendonlyResult(); size_t position = 0; for (auto & key_site : key_sites) { - Columns columns; + Columns columns, column_second; ColumnRawPtrs key_columns, aggregate_columns; Streaming::AggregatedDataVariants hash_map; std::shared_ptr params @@ -378,30 +565,187 @@ TEST(StreamingAggregation, globalcount) Streaming::Aggregator aggregator(*params); Aggregator::AggregateColumns aggregate_column{aggregate_columns}; aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); + auto block = aggregator.convertToBlocks(hash_map, true, 10); + setColumnsData(column_second, {9, "sts", 3, 1}, aggregate_columns); + aggregator.executeOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); auto blocks = aggregator.convertToBlocks(hash_map, true, 10); - checkGlobalAggregationResult(blocks, results, position++); + checkAggregationResults(blocks, results, position++); } } -TEST(StreamingAggregation, windowcount) +TEST(StreamingAggregation, GlobalChangelogAppendOnly) { initAggregation(); - auto key_sites = prepareWindowCountKeys(); - auto results = prepareWindowResult(); + auto key_sites = prepareGlobalCountKeys(); + auto results = prepareGlobalChangelogAppendonlyResult(); size_t position = 0; for (auto & key_site : key_sites) { - Columns columns; + Columns columns, column_second; ColumnRawPtrs key_columns, aggregate_columns; Streaming::AggregatedDataVariants hash_map; std::shared_ptr params - = prepareWindowAggregator(10, key_site, key_columns, aggregate_columns, columns); + = prepareGlobalAggregator(10, key_site, key_columns, aggregate_columns, columns, true); Streaming::Aggregator aggregator(*params); Aggregator::AggregateColumns aggregate_column{aggregate_columns}; aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); + auto block = aggregator.convertToBlocks(hash_map, true, 10); + setColumnsData(column_second, {8, "str", 1, -1}, aggregate_columns); + aggregator.executeOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); auto blocks = aggregator.convertToBlocks(hash_map, true, 10); - checkWindowAggregationResult(blocks, results, position++); + checkAggregationResults(blocks, results, position++); + } +} + +TEST(StreamingAggregation, GlobalAppendOnlyUpdates) +{ + initAggregation(); + auto key_sites = prepareGlobalCountKeys(); + auto results = prepareGlobalAppendonlyUpdateResult(); + size_t position = 0; + for (auto & key_site : key_sites) + { + Columns columns, column_second; + ColumnRawPtrs key_columns, aggregate_columns; + Streaming::AggregatedDataVariants hash_map; + std::shared_ptr params + = prepareGlobalAggregator(10, key_site, key_columns, aggregate_columns, columns); + params->tracking_updates_type = Streaming::TrackingUpdatesType::Updates; + Streaming::Aggregator aggregator(*params); + Aggregator::AggregateColumns aggregate_column{aggregate_columns}; + aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); + auto block = aggregator.convertUpdatesToBlocks(hash_map); + setColumnsData(column_second, {9, "sts", 3, 1}, aggregate_columns); + aggregator.executeOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); + auto blocks = aggregator.convertUpdatesToBlocks(hash_map); + checkAggregationResults(blocks, results, position++); + } +} + +TEST(StreamingAggregation, GlobalChangelogUpdates) +{ + initAggregation(); + auto key_sites = prepareGlobalCountKeys(); + auto results = prepareGlobalChangelogUpdateResult(); + size_t position = 0; + for (auto & key_site : key_sites) + { + Columns columns, column_second; + ColumnRawPtrs key_columns, aggregate_columns; + Streaming::AggregatedDataVariants hash_map; + std::shared_ptr params + = prepareGlobalAggregator(10, key_site, key_columns, aggregate_columns, columns, true); + params->tracking_updates_type = Streaming::TrackingUpdatesType::Updates; + Streaming::Aggregator aggregator(*params); + Aggregator::AggregateColumns aggregate_column{aggregate_columns}; + aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); + auto first_blocks = aggregator.convertUpdatesToBlocks(hash_map); + setColumnsData(column_second, {8, "str", 1, -1}, aggregate_columns); + aggregator.executeOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); + auto second_blocks = aggregator.convertUpdatesToBlocks(hash_map); + checkAggregationResults(second_blocks, results, position++); } } +TEST(StreamingAggregation, GlobalAppendOnlyChangelog) +{ + initAggregation(); + auto key_sites = prepareGlobalCountKeys(); + auto results = prepareGlobalAppendonlyChangelogResult(); + size_t position = 0; + for (auto & key_site : key_sites) + { + Columns columns, column_second; + ColumnRawPtrs key_columns, aggregate_columns; + Streaming::AggregatedDataVariants hash_map; + std::shared_ptr params + = prepareGlobalAggregator(10, key_site, key_columns, aggregate_columns, columns); + params->tracking_updates_type = Streaming::TrackingUpdatesType::UpdatesWithRetract; + Streaming::Aggregator aggregator(*params); + Aggregator::AggregateColumns aggregate_column{aggregate_columns}; + aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); + auto block = aggregator.convertRetractToBlocks(hash_map); + setColumnsData(column_second, {8, "str", 1, 1}, aggregate_columns); + aggregator.executeAndRetractOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); + auto blocks = aggregator.convertRetractToBlocks(hash_map); + checkAggregationResults(blocks, results, position++); + } +} + +TEST(StreamingAggregation, GlobalChangelogChangelog) +{ + initAggregation(); + auto key_sites = prepareGlobalCountKeys(); + auto results = prepareGlobalAppendonlyChangelogResult(); + size_t position = 0; + for (auto & key_site : key_sites) + { + Columns columns, column_second; + ColumnRawPtrs key_columns, aggregate_columns; + Streaming::AggregatedDataVariants hash_map; + std::shared_ptr params + = prepareGlobalAggregator(10, key_site, key_columns, aggregate_columns, columns, true); + params->tracking_updates_type = Streaming::TrackingUpdatesType::UpdatesWithRetract; + Streaming::Aggregator aggregator(*params); + Aggregator::AggregateColumns aggregate_column{aggregate_columns}; + aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); + auto block = aggregator.convertRetractToBlocks(hash_map); + setColumnsData(column_second, {8, "str", 1, 1}, aggregate_columns); + aggregator.executeAndRetractOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); + auto blocks = aggregator.convertRetractToBlocks(hash_map); + auto block_second = aggregator.convertToBlocks(hash_map, true, 10); + checkAggregationResults(blocks, results, position++); + } +} + +TEST(StreamingAggregation, WindowAppendOnlyAppendOnly) +{ + initAggregation(); + auto key_sites = prepareWindowCountKeys(); + auto results = prepareWindowAppendonlyAppendonlyResult(); + size_t position = 0; + for (auto & key_site : key_sites) + { + Columns columns, column_second, column_tird; + ColumnRawPtrs key_columns, aggregate_columns; + Streaming::AggregatedDataVariants hash_map; + std::shared_ptr params + = prepareWindowAggregator(1, key_site, key_columns, aggregate_columns, columns); + Streaming::Aggregator aggregator(*params); + Aggregator::AggregateColumns aggregate_column{aggregate_columns}; + aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); + // auto block_first = aggregator.spliceAndConvertToBlock(hash_map, true, {0}); + setColumnsData(column_second, {8, "str", 3, 1}, aggregate_columns); + aggregator.executeOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); + auto block_second = aggregator.spliceAndConvertToBlock(hash_map, true, {0}); + + checkAggregationResult(block_second, results, position++); + } +} + +TEST(StreamingAggregation, WindowAppendOnlyUpdate) +{ + initAggregation(); + auto key_sites = prepareWindowCountKeys(); + auto results = prepareWindowAppendonlyUpdateResult(); + size_t position = 0; + for (auto & key_site : key_sites) + { + Columns columns, column_second; + ColumnRawPtrs key_columns, aggregate_columns; + Streaming::AggregatedDataVariants hash_map; + std::shared_ptr params + = prepareWindowAggregator(10, key_site, key_columns, aggregate_columns, columns); + params->tracking_updates_type = Streaming::TrackingUpdatesType::Updates; + Streaming::Aggregator aggregator(*params); + Aggregator::AggregateColumns aggregate_column{aggregate_columns}; + aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); + // auto block_first = aggregator.spliceAndConvertUpdatesToBlock(hash_map, {0}); + setColumnsData(column_second, {8, "str", 3, 1}, aggregate_columns); + aggregator.executeOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); + // aggregator.executeOnBlock(column_second, 0, 20, hash_map, key_columns, aggregate_column); + auto block_second = aggregator.spliceAndConvertUpdatesToBlock(hash_map, {0}); + checkAggregationResult(block_second, results, position++); + } +} }; \ No newline at end of file From 820c3ec932c45bc79dde52667a32cb1f00a5a6ea Mon Sep 17 00:00:00 2001 From: juntao-lei-timeplus Date: Wed, 15 May 2024 17:57:05 +0800 Subject: [PATCH 4/7] fix --- .../tests/gtest_streaming_aggregator.cpp | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp b/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp index 6b3c4ef376..a919a2a6d5 100644 --- a/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp +++ b/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp @@ -97,7 +97,7 @@ prepareParams(size_t max_threads, bool is_changelog_input, [[maybe_unused]] std: bool keep_state{true}; size_t streaming_window_count{0}; Streaming::Aggregator::Params::GroupBy streaming_group_by{Streaming::Aggregator::Params::GroupBy::OTHER}; - ssize_t delta_col_pos{is_changelog_input ? 20 : -1}; + int delta_col_pos{is_changelog_input ? 20 : -1}; size_t window_keys_num{0}; Streaming::WindowParamsPtr window_params{nullptr}; Streaming::TrackingUpdatesType tracking_updates_type{Streaming::TrackingUpdatesType::None}; @@ -173,14 +173,14 @@ void setColumnsData(Columns & columns, InputData input_data, [[maybe_unused]] st c_int_16->insertValue(input_data.int_data); c_int_32->insertValue(input_data.int_data); c_int_64->insertValue(input_data.int_data); - c_int_128->insertValue(input_data.int_data); - c_int_256->insertValue(input_data.int_data); + c_int_128->insertValue(Int128(input_data.int_data)); + c_int_256->insertValue(Int256(input_data.int_data)); c_low_int8->insert(input_data.int_data); c_low_int16->insert(input_data.int_data); c_low_int32->insert(input_data.int_data); c_low_int64->insert(input_data.int_data); - c_low_int128->insert(input_data.int_data); - c_low_int256->insert(input_data.int_data); + c_low_int128->insert(Int128(input_data.int_data)); + c_low_int256->insert(Int256(input_data.int_data)); c_low_str->insertData(input_data.str_data.c_str(), 3); c_low_fixed_str->insertData(input_data.str_data.c_str(), 3); c_str->insertData(input_data.str_data.c_str(), 3); @@ -415,8 +415,8 @@ std::vector>> prepareGlobalCh std::vector>> prepareWindowAppendonlyAppendonlyResult() { std::vector>> result{ - {{1, 0, Int64(20)}}, - {{2, 0, Int64(20)}}, + {{1, 0, Int64(20)}}, + {{2, 0, Int64(20)}}, {{2, 0, Int64(8)}, {3, 0, Int64(20)}}, {{2, 0, Int64(8)}, {3, 0, Int64(20)}}, {{2, 0, Int64(8)}, {3, 0, Int64(20)}}, @@ -430,7 +430,7 @@ std::vector>> prepareWindowAp std::vector>> prepareWindowAppendonlyUpdateResult() { std::vector>> result{ - {{1, 0, Int64(20)}}, + {{1, 0, Int64(20)}}, {{2, 0, Int64(20)}}, {{2, 0, Int64(8)}, {3, 0, Int64(20)}}, {{2, 0, Int64(8)}, {3, 0, Int64(20)}}, @@ -714,7 +714,6 @@ TEST(StreamingAggregation, WindowAppendOnlyAppendOnly) Streaming::Aggregator aggregator(*params); Aggregator::AggregateColumns aggregate_column{aggregate_columns}; aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); - // auto block_first = aggregator.spliceAndConvertToBlock(hash_map, true, {0}); setColumnsData(column_second, {8, "str", 3, 1}, aggregate_columns); aggregator.executeOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); auto block_second = aggregator.spliceAndConvertToBlock(hash_map, true, {0}); @@ -740,10 +739,8 @@ TEST(StreamingAggregation, WindowAppendOnlyUpdate) Streaming::Aggregator aggregator(*params); Aggregator::AggregateColumns aggregate_column{aggregate_columns}; aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); - // auto block_first = aggregator.spliceAndConvertUpdatesToBlock(hash_map, {0}); setColumnsData(column_second, {8, "str", 3, 1}, aggregate_columns); aggregator.executeOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); - // aggregator.executeOnBlock(column_second, 0, 20, hash_map, key_columns, aggregate_column); auto block_second = aggregator.spliceAndConvertUpdatesToBlock(hash_map, {0}); checkAggregationResult(block_second, results, position++); } From 7605b35d7938e9ad6e662249d209b7117f520f17 Mon Sep 17 00:00:00 2001 From: juntao-lei-timeplus Date: Thu, 16 May 2024 20:47:52 +0800 Subject: [PATCH 5/7] Refactor code for improved readability and maintainability --- .../tests/gtest_streaming_aggregator.cpp | 507 ++++++++---------- 1 file changed, 231 insertions(+), 276 deletions(-) diff --git a/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp b/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp index a919a2a6d5..2751a720fe 100644 --- a/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp +++ b/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp @@ -214,12 +214,161 @@ void setColumnsData(Columns & columns, InputData input_data, [[maybe_unused]] st columns.push_back(std::move(c_delta)); } +std::shared_ptr prepareGlobalAggregator( + size_t max_threads, + std::vector & key_site, + ColumnRawPtrs & key_columns, + InputData & input_data, + [[maybe_unused]] ColumnRawPtrs & aggr_columns, + Columns & columns, + bool is_changelog_input = false) +{ + setColumnsData(columns, input_data, aggr_columns); + std::shared_ptr params = prepareParams(max_threads, is_changelog_input, key_site); + for (size_t i = 0; i < key_site.size(); ++i) + { + key_columns.push_back(nullptr); + } + return params; +} + +std::shared_ptr prepareWindowAggregator( + size_t max_threads, + std::vector & key_site, + ColumnRawPtrs & key_columns, + InputData & input_data, + [[maybe_unused]] ColumnRawPtrs & aggr_columns, + Columns & columns, + bool is_changelog_input = false) +{ + setColumnsData(columns, input_data, aggr_columns); + std::shared_ptr params = prepareParams(max_threads, is_changelog_input, key_site); + params->group_by = {Streaming::Aggregator::Params::GroupBy::WINDOW_END}; + params->window_keys_num = 2; + if (key_site.size() == 1) + params->window_keys_num = 1; + ParserFunction func_parser; + auto ast = parseQuery(func_parser, "tumble(stream, 5s)", 0, 10000); + NamesAndTypesList columns_list; + if (key_site.size() == 1) + columns_list = {{"window_start", std::make_shared(3, "UTC")}}; + columns_list.push_back({"window_end", std::make_shared(3, "UTC")}); + Streaming::TableFunctionDescriptionPtr table_function_description = std::make_shared( + ast, + Streaming::WindowType::Tumble, + Names{"_tp_time", "5s"}, + DataTypes{std::make_shared(3, "UTC"), std::make_shared(IntervalKind::Second)}, + std::shared_ptr(), + Names{"_tp_time"}, + columns_list); + Streaming::WindowParamsPtr window_params = Streaming::WindowParams::create(table_function_description); + + params->window_params = window_params; + Streaming::Aggregator aggregator(*params); + for (size_t i = 0; i < key_site.size(); ++i) + { + key_columns.push_back(nullptr); + } + return params; +} + +using ResultType = std::variant; + +void checkAggregationResults( + BlocksList & blocks, std::vector>> & results, size_t position) +{ + auto & result = results[position]; + std::map result_map, expected_map; + for (auto & [column, raw, value] : result) + { + if (std::holds_alternative(value)) + { + expected_map[value]++; + result_map[static_cast(blocks.begin()->getByPosition(column).column->get64(raw))]++; + } + else if (std::holds_alternative(value)) + { + expected_map[value]++; + result_map[blocks.begin()->getByPosition(column).column->getInt(raw)]++; + } + else + { + expected_map[value]++; + result_map[blocks.begin()->getByPosition(column).column->getDataAt(raw).toString()]++; + } + } + EXPECT_EQ(result_map, expected_map); +} + +auto convertToResult( + Streaming::Aggregator & aggregator, + Streaming::AggregatedDataVariants & hash_map, + InputData & input_data, + bool is_changelog_output, + bool is_update_output, + bool is_window_aggregator) +{ + if (is_update_output && is_window_aggregator) + return std::list{aggregator.spliceAndConvertUpdatesToBlock(hash_map, {Int64(input_data.time_data / 5 * 5)})}; + else if (is_window_aggregator) + return std::list{aggregator.spliceAndConvertToBlock(hash_map, true, {Int64(input_data.time_data / 5 * 5)})}; + else if (is_changelog_output) + return aggregator.convertRetractToBlocks(hash_map); + else if (is_update_output) + return aggregator.convertUpdatesToBlocks(hash_map); + else + return aggregator.convertToBlocks(hash_map, true, 10); +} + void initAggregation() { tryRegisterFormats(); tryRegisterAggregateFunctions(); } +template +void executeAggregatorTest( + KeyFunc key_func, + ResultFunc result_func, + AggregatorFunc aggregator_func, + InputData first_input_data, + InputData second_input_data, + bool is_changelog_input, + bool is_changelog_output, + bool is_update_output, + bool is_window_aggregator) +{ + initAggregation(); + auto key_sites = key_func(); + auto results = result_func(); + size_t position = 0; + for (auto & key_site : key_sites) + { + Columns columns_first, columns_second; + ColumnRawPtrs key_columns, aggregate_columns; + Streaming::AggregatedDataVariants hash_map; + std::shared_ptr params + = aggregator_func(10, key_site, key_columns, first_input_data, aggregate_columns, columns_first, is_changelog_input); + if (is_changelog_output) + params->tracking_updates_type = Streaming::TrackingUpdatesType::UpdatesWithRetract; + if (is_update_output) + params->tracking_updates_type = Streaming::TrackingUpdatesType::Updates; + Streaming::Aggregator aggregator(*params); + Aggregator::AggregateColumns aggregate_column{aggregate_columns}; + aggregator.executeOnBlock(columns_first, 0, 10, hash_map, key_columns, aggregate_column); + auto block_first + = convertToResult(aggregator, hash_map, first_input_data, is_changelog_output, is_update_output, is_window_aggregator); + setColumnsData(columns_second, second_input_data, aggregate_columns); + if (is_changelog_output) + aggregator.executeAndRetractOnBlock(columns_second, 0, 10, hash_map, key_columns, aggregate_column); + else + aggregator.executeOnBlock(columns_second, 0, 10, hash_map, key_columns, aggregate_column); + auto block_second + = convertToResult(aggregator, hash_map, second_input_data, is_changelog_output, is_update_output, is_window_aggregator); + checkAggregationResults(block_second, results, position++); + } +} + std::vector> prepareGlobalCountKeys() { std::vector> key_sites{ @@ -260,8 +409,6 @@ std::vector> prepareWindowCountKeys() return key_sites; } -using ResultType = std::variant; - std::vector>> prepareGlobalAppendonlyAppendonlyResult() { std::vector>> result{ @@ -442,307 +589,115 @@ std::vector>> prepareWindowAp return result; }; -void checkAggregationResults( - BlocksList & blocks, std::vector>> & results, size_t position) -{ - auto & result = results[position]; - std::map result_map, expected_map; - for (auto & [column, raw, value] : result) - { - if (std::holds_alternative(value)) - { - expected_map[value]++; - result_map[static_cast(blocks.begin()->getByPosition(column).column->get64(raw))]++; - } - else if (std::holds_alternative(value)) - { - expected_map[value]++; - result_map[blocks.begin()->getByPosition(column).column->getInt(raw)]++; - } - else - { - expected_map[value]++; - result_map[blocks.begin()->getByPosition(column).column->getDataAt(raw).toString()]++; - } - } - EXPECT_EQ(result_map, expected_map); -} - -void checkAggregationResult(Block & block, std::vector>> & results, size_t position) -{ - auto & result = results[position]; - std::map result_map, expected_map; - for (auto & [column, raw, value] : result) - { - if (std::holds_alternative(value)) - { - expected_map[value]++; - result_map[static_cast(block.getByPosition(column).column->get64(raw))]++; - } - else if (std::holds_alternative(value)) - { - expected_map[value]++; - result_map[block.getByPosition(column).column->getInt(raw)]++; - } - else - { - expected_map[value]++; - result_map[block.getByPosition(column).column->getDataAt(raw).toString()]++; - } - } - EXPECT_EQ(result_map, expected_map); -} - -std::shared_ptr prepareGlobalAggregator( - size_t max_threads, - std::vector & key_site, - ColumnRawPtrs & key_columns, - [[maybe_unused]] ColumnRawPtrs & aggr_columns, - Columns & columns, - bool is_changelog_input = false) -{ - setColumnsData(columns, {8, "str", 1, 1}, aggr_columns); - std::shared_ptr params = prepareParams(max_threads, is_changelog_input, key_site); - for (size_t i = 0; i < key_site.size(); ++i) - { - key_columns.push_back(nullptr); - } - return params; -} - -std::shared_ptr prepareWindowAggregator( - size_t max_threads, - std::vector & key_site, - ColumnRawPtrs & key_columns, - [[maybe_unused]] ColumnRawPtrs & aggr_columns, - Columns & columns, - bool is_changelog_input = false) -{ - setColumnsData(columns, {8, "str", 1, 1}, aggr_columns); - std::shared_ptr params = prepareParams(max_threads, is_changelog_input, key_site); - params->group_by = {Streaming::Aggregator::Params::GroupBy::WINDOW_END}; - params->window_keys_num = 2; - if (key_site.size() == 1) - params->window_keys_num = 1; - ParserFunction func_parser; - auto ast = parseQuery(func_parser, "tumble(stream, 5s)", 0, 10000); - NamesAndTypesList columns_list; - if (key_site.size() == 1) - columns_list = {{"window_start", std::make_shared(3, "UTC")}}; - columns_list.push_back({"window_end", std::make_shared(3, "UTC")}); - Streaming::TableFunctionDescriptionPtr table_function_description = std::make_shared( - ast, - Streaming::WindowType::Tumble, - Names{"_tp_time", "5s"}, - DataTypes{std::make_shared(3, "UTC"), std::make_shared(IntervalKind::Second)}, - std::shared_ptr(), - Names{"_tp_time"}, - columns_list); - Streaming::WindowParamsPtr window_params = Streaming::WindowParams::create(table_function_description); - - params->window_params = window_params; - Streaming::Aggregator aggregator(*params); - for (size_t i = 0; i < key_site.size(); ++i) - { - key_columns.push_back(nullptr); - } - return params; -} - TEST(StreamingAggregation, GlobalAppendOnlyAppendOnly) { - initAggregation(); - auto key_sites = prepareGlobalCountKeys(); - auto results = prepareGlobalAppendonlyAppendonlyResult(); - size_t position = 0; - for (auto & key_site : key_sites) - { - Columns columns, column_second; - ColumnRawPtrs key_columns, aggregate_columns; - Streaming::AggregatedDataVariants hash_map; - std::shared_ptr params - = prepareGlobalAggregator(10, key_site, key_columns, aggregate_columns, columns); - Streaming::Aggregator aggregator(*params); - Aggregator::AggregateColumns aggregate_column{aggregate_columns}; - aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); - auto block = aggregator.convertToBlocks(hash_map, true, 10); - setColumnsData(column_second, {9, "sts", 3, 1}, aggregate_columns); - aggregator.executeOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); - auto blocks = aggregator.convertToBlocks(hash_map, true, 10); - checkAggregationResults(blocks, results, position++); - } + executeAggregatorTest( + prepareGlobalCountKeys, + prepareGlobalAppendonlyAppendonlyResult, + prepareGlobalAggregator, + {8, "str", 1, 1}, + {9, "sts", 3, 1}, + false, + false, + false, + false); } TEST(StreamingAggregation, GlobalChangelogAppendOnly) { - initAggregation(); - auto key_sites = prepareGlobalCountKeys(); - auto results = prepareGlobalChangelogAppendonlyResult(); - size_t position = 0; - for (auto & key_site : key_sites) - { - Columns columns, column_second; - ColumnRawPtrs key_columns, aggregate_columns; - Streaming::AggregatedDataVariants hash_map; - std::shared_ptr params - = prepareGlobalAggregator(10, key_site, key_columns, aggregate_columns, columns, true); - Streaming::Aggregator aggregator(*params); - Aggregator::AggregateColumns aggregate_column{aggregate_columns}; - aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); - auto block = aggregator.convertToBlocks(hash_map, true, 10); - setColumnsData(column_second, {8, "str", 1, -1}, aggregate_columns); - aggregator.executeOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); - auto blocks = aggregator.convertToBlocks(hash_map, true, 10); - checkAggregationResults(blocks, results, position++); - } + executeAggregatorTest( + prepareGlobalCountKeys, + prepareGlobalChangelogAppendonlyResult, + prepareGlobalAggregator, + {8, "str", 1, 1}, + {8, "str", 1, -1}, + true, + false, + false, + false); } -TEST(StreamingAggregation, GlobalAppendOnlyUpdates) +TEST(StreamingAggregation, GlobalAppendOnlyUpdate) { - initAggregation(); - auto key_sites = prepareGlobalCountKeys(); - auto results = prepareGlobalAppendonlyUpdateResult(); - size_t position = 0; - for (auto & key_site : key_sites) - { - Columns columns, column_second; - ColumnRawPtrs key_columns, aggregate_columns; - Streaming::AggregatedDataVariants hash_map; - std::shared_ptr params - = prepareGlobalAggregator(10, key_site, key_columns, aggregate_columns, columns); - params->tracking_updates_type = Streaming::TrackingUpdatesType::Updates; - Streaming::Aggregator aggregator(*params); - Aggregator::AggregateColumns aggregate_column{aggregate_columns}; - aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); - auto block = aggregator.convertUpdatesToBlocks(hash_map); - setColumnsData(column_second, {9, "sts", 3, 1}, aggregate_columns); - aggregator.executeOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); - auto blocks = aggregator.convertUpdatesToBlocks(hash_map); - checkAggregationResults(blocks, results, position++); - } + executeAggregatorTest( + prepareGlobalCountKeys, + prepareGlobalAppendonlyUpdateResult, + prepareGlobalAggregator, + {8, "str", 1, 1}, + {9, "sts", 3, 1}, + false, + false, + true, + false); } -TEST(StreamingAggregation, GlobalChangelogUpdates) +TEST(StreamingAggregation, GlobalChangelogUpdate) { - initAggregation(); - auto key_sites = prepareGlobalCountKeys(); - auto results = prepareGlobalChangelogUpdateResult(); - size_t position = 0; - for (auto & key_site : key_sites) - { - Columns columns, column_second; - ColumnRawPtrs key_columns, aggregate_columns; - Streaming::AggregatedDataVariants hash_map; - std::shared_ptr params - = prepareGlobalAggregator(10, key_site, key_columns, aggregate_columns, columns, true); - params->tracking_updates_type = Streaming::TrackingUpdatesType::Updates; - Streaming::Aggregator aggregator(*params); - Aggregator::AggregateColumns aggregate_column{aggregate_columns}; - aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); - auto first_blocks = aggregator.convertUpdatesToBlocks(hash_map); - setColumnsData(column_second, {8, "str", 1, -1}, aggregate_columns); - aggregator.executeOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); - auto second_blocks = aggregator.convertUpdatesToBlocks(hash_map); - checkAggregationResults(second_blocks, results, position++); - } + executeAggregatorTest( + prepareGlobalCountKeys, + prepareGlobalChangelogUpdateResult, + prepareGlobalAggregator, + {8, "str", 1, 1}, + {8, "str", 1, -1}, + true, + false, + true, + false); } TEST(StreamingAggregation, GlobalAppendOnlyChangelog) { - initAggregation(); - auto key_sites = prepareGlobalCountKeys(); - auto results = prepareGlobalAppendonlyChangelogResult(); - size_t position = 0; - for (auto & key_site : key_sites) - { - Columns columns, column_second; - ColumnRawPtrs key_columns, aggregate_columns; - Streaming::AggregatedDataVariants hash_map; - std::shared_ptr params - = prepareGlobalAggregator(10, key_site, key_columns, aggregate_columns, columns); - params->tracking_updates_type = Streaming::TrackingUpdatesType::UpdatesWithRetract; - Streaming::Aggregator aggregator(*params); - Aggregator::AggregateColumns aggregate_column{aggregate_columns}; - aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); - auto block = aggregator.convertRetractToBlocks(hash_map); - setColumnsData(column_second, {8, "str", 1, 1}, aggregate_columns); - aggregator.executeAndRetractOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); - auto blocks = aggregator.convertRetractToBlocks(hash_map); - checkAggregationResults(blocks, results, position++); - } + executeAggregatorTest( + prepareGlobalCountKeys, + prepareGlobalAppendonlyChangelogResult, + prepareGlobalAggregator, + {8, "str", 1, 1}, + {8, "str", 3, 1}, + false, + true, + false, + false); } TEST(StreamingAggregation, GlobalChangelogChangelog) { - initAggregation(); - auto key_sites = prepareGlobalCountKeys(); - auto results = prepareGlobalAppendonlyChangelogResult(); - size_t position = 0; - for (auto & key_site : key_sites) - { - Columns columns, column_second; - ColumnRawPtrs key_columns, aggregate_columns; - Streaming::AggregatedDataVariants hash_map; - std::shared_ptr params - = prepareGlobalAggregator(10, key_site, key_columns, aggregate_columns, columns, true); - params->tracking_updates_type = Streaming::TrackingUpdatesType::UpdatesWithRetract; - Streaming::Aggregator aggregator(*params); - Aggregator::AggregateColumns aggregate_column{aggregate_columns}; - aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); - auto block = aggregator.convertRetractToBlocks(hash_map); - setColumnsData(column_second, {8, "str", 1, 1}, aggregate_columns); - aggregator.executeAndRetractOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); - auto blocks = aggregator.convertRetractToBlocks(hash_map); - auto block_second = aggregator.convertToBlocks(hash_map, true, 10); - checkAggregationResults(blocks, results, position++); - } + executeAggregatorTest( + prepareGlobalCountKeys, + prepareGlobalChangelogChangelogResult, + prepareGlobalAggregator, + {8, "str", 1, 1}, + {8, "str", 3, 1}, + true, + true, + false, + false); } TEST(StreamingAggregation, WindowAppendOnlyAppendOnly) { - initAggregation(); - auto key_sites = prepareWindowCountKeys(); - auto results = prepareWindowAppendonlyAppendonlyResult(); - size_t position = 0; - for (auto & key_site : key_sites) - { - Columns columns, column_second, column_tird; - ColumnRawPtrs key_columns, aggregate_columns; - Streaming::AggregatedDataVariants hash_map; - std::shared_ptr params - = prepareWindowAggregator(1, key_site, key_columns, aggregate_columns, columns); - Streaming::Aggregator aggregator(*params); - Aggregator::AggregateColumns aggregate_column{aggregate_columns}; - aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); - setColumnsData(column_second, {8, "str", 3, 1}, aggregate_columns); - aggregator.executeOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); - auto block_second = aggregator.spliceAndConvertToBlock(hash_map, true, {0}); - - checkAggregationResult(block_second, results, position++); - } + executeAggregatorTest( + prepareWindowCountKeys, + prepareWindowAppendonlyAppendonlyResult, + prepareWindowAggregator, + {8, "str", 1, 1}, + {8, "str", 3, 1}, + false, + false, + false, + true); } TEST(StreamingAggregation, WindowAppendOnlyUpdate) { - initAggregation(); - auto key_sites = prepareWindowCountKeys(); - auto results = prepareWindowAppendonlyUpdateResult(); - size_t position = 0; - for (auto & key_site : key_sites) - { - Columns columns, column_second; - ColumnRawPtrs key_columns, aggregate_columns; - Streaming::AggregatedDataVariants hash_map; - std::shared_ptr params - = prepareWindowAggregator(10, key_site, key_columns, aggregate_columns, columns); - params->tracking_updates_type = Streaming::TrackingUpdatesType::Updates; - Streaming::Aggregator aggregator(*params); - Aggregator::AggregateColumns aggregate_column{aggregate_columns}; - aggregator.executeOnBlock(columns, 0, 10, hash_map, key_columns, aggregate_column); - setColumnsData(column_second, {8, "str", 3, 1}, aggregate_columns); - aggregator.executeOnBlock(column_second, 0, 10, hash_map, key_columns, aggregate_column); - auto block_second = aggregator.spliceAndConvertUpdatesToBlock(hash_map, {0}); - checkAggregationResult(block_second, results, position++); - } + executeAggregatorTest( + prepareWindowCountKeys, + prepareWindowAppendonlyUpdateResult, + prepareWindowAggregator, + {8, "str", 1, 1}, + {8, "str", 3, 1}, + false, + false, + true, + true); } }; \ No newline at end of file From 34f412302d4e768d44d9b860f0e8151f5515f4f9 Mon Sep 17 00:00:00 2001 From: juntao-lei-timeplus Date: Mon, 20 May 2024 10:52:17 +0800 Subject: [PATCH 6/7] fix --- .../Streaming/tests/gtest_streaming_aggregator.cpp | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp b/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp index 2751a720fe..9df57ffeca 100644 --- a/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp +++ b/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -353,18 +354,18 @@ void executeAggregatorTest( params->tracking_updates_type = Streaming::TrackingUpdatesType::UpdatesWithRetract; if (is_update_output) params->tracking_updates_type = Streaming::TrackingUpdatesType::Updates; - Streaming::Aggregator aggregator(*params); + auto aggregator=std::make_shared(*params); Aggregator::AggregateColumns aggregate_column{aggregate_columns}; - aggregator.executeOnBlock(columns_first, 0, 10, hash_map, key_columns, aggregate_column); + aggregator->executeOnBlock(columns_first, 0, 10, hash_map, key_columns, aggregate_column); auto block_first - = convertToResult(aggregator, hash_map, first_input_data, is_changelog_output, is_update_output, is_window_aggregator); + = convertToResult(*aggregator, hash_map, first_input_data, is_changelog_output, is_update_output, is_window_aggregator); setColumnsData(columns_second, second_input_data, aggregate_columns); if (is_changelog_output) - aggregator.executeAndRetractOnBlock(columns_second, 0, 10, hash_map, key_columns, aggregate_column); + aggregator->executeAndRetractOnBlock(columns_second, 0, 10, hash_map, key_columns, aggregate_column); else - aggregator.executeOnBlock(columns_second, 0, 10, hash_map, key_columns, aggregate_column); + aggregator->executeOnBlock(columns_second, 0, 10, hash_map, key_columns, aggregate_column); auto block_second - = convertToResult(aggregator, hash_map, second_input_data, is_changelog_output, is_update_output, is_window_aggregator); + = convertToResult(*aggregator, hash_map, second_input_data, is_changelog_output, is_update_output, is_window_aggregator); checkAggregationResults(block_second, results, position++); } } From a4e4c2dd2e2e10694e1d6ed5a2e2bcf20e998948 Mon Sep 17 00:00:00 2001 From: juntao-lei-timeplus Date: Mon, 20 May 2024 14:47:47 +0800 Subject: [PATCH 7/7] fix --- .../tests/gtest_streaming_aggregator.cpp | 43 +++++++++---------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp b/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp index 9df57ffeca..66b194b5ac 100644 --- a/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp +++ b/src/Interpreters/Streaming/tests/gtest_streaming_aggregator.cpp @@ -75,11 +75,10 @@ void prepareAggregates(AggregateDescriptions & aggregates, bool is_changelog_inp aggregates.push_back(aggregate); } -std::shared_ptr -prepareParams(size_t max_threads, bool is_changelog_input, [[maybe_unused]] std::vector & key_site) +Streaming::Aggregator::Params prepareParams(size_t max_threads, bool is_changelog_input, [[maybe_unused]] std::vector & key_site) { Block src_header = prepareHeader(); - ColumnNumbers keys = key_site; + ColumnNumbers & keys = key_site; AggregateDescriptions aggregates; prepareAggregates(aggregates, is_changelog_input); bool overflow_row{false}; @@ -102,7 +101,7 @@ prepareParams(size_t max_threads, bool is_changelog_input, [[maybe_unused]] std: size_t window_keys_num{0}; Streaming::WindowParamsPtr window_params{nullptr}; Streaming::TrackingUpdatesType tracking_updates_type{Streaming::TrackingUpdatesType::None}; - return make_shared( + return Streaming::Aggregator::Params( src_header, keys, aggregates, @@ -215,7 +214,7 @@ void setColumnsData(Columns & columns, InputData input_data, [[maybe_unused]] st columns.push_back(std::move(c_delta)); } -std::shared_ptr prepareGlobalAggregator( +Streaming::Aggregator::Params prepareGlobalAggregator( size_t max_threads, std::vector & key_site, ColumnRawPtrs & key_columns, @@ -225,7 +224,7 @@ std::shared_ptr prepareGlobalAggregator( bool is_changelog_input = false) { setColumnsData(columns, input_data, aggr_columns); - std::shared_ptr params = prepareParams(max_threads, is_changelog_input, key_site); + auto params = prepareParams(max_threads, is_changelog_input, key_site); for (size_t i = 0; i < key_site.size(); ++i) { key_columns.push_back(nullptr); @@ -233,7 +232,7 @@ std::shared_ptr prepareGlobalAggregator( return params; } -std::shared_ptr prepareWindowAggregator( +Streaming::Aggregator::Params prepareWindowAggregator( size_t max_threads, std::vector & key_site, ColumnRawPtrs & key_columns, @@ -243,11 +242,11 @@ std::shared_ptr prepareWindowAggregator( bool is_changelog_input = false) { setColumnsData(columns, input_data, aggr_columns); - std::shared_ptr params = prepareParams(max_threads, is_changelog_input, key_site); - params->group_by = {Streaming::Aggregator::Params::GroupBy::WINDOW_END}; - params->window_keys_num = 2; + auto params = prepareParams(max_threads, is_changelog_input, key_site); + params.group_by = {Streaming::Aggregator::Params::GroupBy::WINDOW_END}; + params.window_keys_num = 2; if (key_site.size() == 1) - params->window_keys_num = 1; + params.window_keys_num = 1; ParserFunction func_parser; auto ast = parseQuery(func_parser, "tumble(stream, 5s)", 0, 10000); NamesAndTypesList columns_list; @@ -263,9 +262,7 @@ std::shared_ptr prepareWindowAggregator( Names{"_tp_time"}, columns_list); Streaming::WindowParamsPtr window_params = Streaming::WindowParams::create(table_function_description); - - params->window_params = window_params; - Streaming::Aggregator aggregator(*params); + params.window_params = window_params; for (size_t i = 0; i < key_site.size(); ++i) { key_columns.push_back(nullptr); @@ -348,24 +345,24 @@ void executeAggregatorTest( Columns columns_first, columns_second; ColumnRawPtrs key_columns, aggregate_columns; Streaming::AggregatedDataVariants hash_map; - std::shared_ptr params + Streaming::Aggregator::Params params = aggregator_func(10, key_site, key_columns, first_input_data, aggregate_columns, columns_first, is_changelog_input); if (is_changelog_output) - params->tracking_updates_type = Streaming::TrackingUpdatesType::UpdatesWithRetract; + params.tracking_updates_type = Streaming::TrackingUpdatesType::UpdatesWithRetract; if (is_update_output) - params->tracking_updates_type = Streaming::TrackingUpdatesType::Updates; - auto aggregator=std::make_shared(*params); + params.tracking_updates_type = Streaming::TrackingUpdatesType::Updates; + auto aggregator = Streaming::Aggregator(params); Aggregator::AggregateColumns aggregate_column{aggregate_columns}; - aggregator->executeOnBlock(columns_first, 0, 10, hash_map, key_columns, aggregate_column); + aggregator.executeOnBlock(columns_first, 0, 10, hash_map, key_columns, aggregate_column); auto block_first - = convertToResult(*aggregator, hash_map, first_input_data, is_changelog_output, is_update_output, is_window_aggregator); + = convertToResult(aggregator, hash_map, first_input_data, is_changelog_output, is_update_output, is_window_aggregator); setColumnsData(columns_second, second_input_data, aggregate_columns); if (is_changelog_output) - aggregator->executeAndRetractOnBlock(columns_second, 0, 10, hash_map, key_columns, aggregate_column); + aggregator.executeAndRetractOnBlock(columns_second, 0, 10, hash_map, key_columns, aggregate_column); else - aggregator->executeOnBlock(columns_second, 0, 10, hash_map, key_columns, aggregate_column); + aggregator.executeOnBlock(columns_second, 0, 10, hash_map, key_columns, aggregate_column); auto block_second - = convertToResult(*aggregator, hash_map, second_input_data, is_changelog_output, is_update_output, is_window_aggregator); + = convertToResult(aggregator, hash_map, second_input_data, is_changelog_output, is_update_output, is_window_aggregator); checkAggregationResults(block_second, results, position++); } }