diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index 2ad3c048c0fc..3115950cdf09 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -87,14 +87,14 @@ namespace DB { namespace ErrorCodes { - extern const int LOGICAL_ERROR; - extern const int UNKNOWN_TYPE; - extern const int BAD_ARGUMENTS; - extern const int NO_SUCH_DATA_PART; - extern const int UNKNOWN_FUNCTION; - extern const int CANNOT_PARSE_PROTOBUF_SCHEMA; - extern const int ILLEGAL_TYPE_OF_ARGUMENT; - extern const int INVALID_JOIN_ON_EXPRESSION; +extern const int LOGICAL_ERROR; +extern const int UNKNOWN_TYPE; +extern const int BAD_ARGUMENTS; +extern const int NO_SUCH_DATA_PART; +extern const int UNKNOWN_FUNCTION; +extern const int CANNOT_PARSE_PROTOBUF_SCHEMA; +extern const int ILLEGAL_TYPE_OF_ARGUMENT; +extern const int INVALID_JOIN_ON_EXPRESSION; } } @@ -144,16 +144,13 @@ void SerializedPlanParser::parseExtensions( if (extension.has_extension_function()) { function_mapping.emplace( - std::to_string(extension.extension_function().function_anchor()), - extension.extension_function().name()); + std::to_string(extension.extension_function().function_anchor()), extension.extension_function().name()); } } } std::shared_ptr SerializedPlanParser::expressionsToActionsDAG( - const std::vector & expressions, - const Block & header, - const Block & read_schema) + const std::vector & expressions, const Block & header, const Block & read_schema) { auto actions_dag = std::make_shared(blockToNameAndTypeList(header)); NamesWithAliases required_columns; @@ -259,8 +256,8 @@ std::string getDecimalFunction(const substrait::Type_Decimal & decimal, bool nul bool SerializedPlanParser::isReadRelFromJava(const substrait::ReadRel & rel) { - return rel.has_local_files() && rel.local_files().items().size() == 1 && rel.local_files().items().at(0).uri_file().starts_with( - "iterator"); + return rel.has_local_files() && rel.local_files().items().size() == 1 + && rel.local_files().items().at(0).uri_file().starts_with("iterator"); } bool SerializedPlanParser::isReadFromMergeTree(const substrait::ReadRel & rel) @@ -380,7 +377,7 @@ DataTypePtr wrapNullableType(bool nullable, DataTypePtr nested_type) return nested_type; } -QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan& plan) +QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan) { logDebugMessage(plan, "substrait plan"); parseExtensions(plan.extensions()); @@ -587,9 +584,7 @@ SerializedPlanParser::getFunctionName(const std::string & function_signature, co { if (args.size() != 2) throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "Spark function extract requires two args, function:{}", - function.ShortDebugString()); + ErrorCodes::BAD_ARGUMENTS, "Spark function extract requires two args, function:{}", function.ShortDebugString()); // Get the first arg: field const auto & extract_field = args.at(0); @@ -705,9 +700,7 @@ void SerializedPlanParser::parseArrayJoinArguments( /// The argument number of arrayJoin(converted from Spark explode/posexplode) should be 1 if (scalar_function.arguments_size() != 1) throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "Argument number of arrayJoin should be 1 instead of {}", - scalar_function.arguments_size()); + ErrorCodes::BAD_ARGUMENTS, "Argument number of arrayJoin should be 1 instead of {}", scalar_function.arguments_size()); auto function_name_copy = function_name; parseFunctionArguments(actions_dag, parsed_args, function_name_copy, scalar_function); @@ -746,11 +739,7 @@ void SerializedPlanParser::parseArrayJoinArguments( } ActionsDAG::NodeRawConstPtrs SerializedPlanParser::parseArrayJoinWithDAG( - const substrait::Expression & rel, - std::vector & result_names, - ActionsDAGPtr actions_dag, - bool keep_result, - bool position) + const substrait::Expression & rel, std::vector & result_names, ActionsDAGPtr actions_dag, bool keep_result, bool position) { if (!rel.has_scalar_function()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "The root of expression should be a scalar function:\n {}", rel.DebugString()); @@ -774,7 +763,8 @@ ActionsDAG::NodeRawConstPtrs SerializedPlanParser::parseArrayJoinWithDAG( auto tuple_element_builder = FunctionFactory::instance().get("sparkTupleElement", context); auto tuple_index_type = std::make_shared(); - auto add_tuple_element = [&](const ActionsDAG::Node * tuple_node, size_t i) -> const ActionsDAG::Node * { + auto add_tuple_element = [&](const ActionsDAG::Node * tuple_node, size_t i) -> const ActionsDAG::Node * + { ColumnWithTypeAndName index_col(tuple_index_type->createColumnConst(1, i), tuple_index_type, getUniqueName(std::to_string(i))); const auto * index_node = &actions_dag->addColumn(std::move(index_col)); auto result_name = "sparkTupleElement(" + tuple_node->result_name + ", " + index_node->result_name + ")"; @@ -866,10 +856,7 @@ ActionsDAG::NodeRawConstPtrs SerializedPlanParser::parseArrayJoinWithDAG( } const ActionsDAG::Node * SerializedPlanParser::parseFunctionWithDAG( - const substrait::Expression & rel, - std::string & result_name, - ActionsDAGPtr actions_dag, - bool keep_result) + const substrait::Expression & rel, std::string & result_name, ActionsDAGPtr actions_dag, bool keep_result) { if (!rel.has_scalar_function()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "the root of expression should be a scalar function:\n {}", rel.DebugString()); @@ -884,10 +871,7 @@ const ActionsDAG::Node * SerializedPlanParser::parseFunctionWithDAG( if (auto func_parser = FunctionParserFactory::instance().tryGet(func_name, this)) { LOG_DEBUG( - &Poco::Logger::get("SerializedPlanParser"), - "parse function {} by function parser: {}", - func_name, - func_parser->getName()); + &Poco::Logger::get("SerializedPlanParser"), "parse function {} by function parser: {}", func_name, func_parser->getName()); const auto * result_node = func_parser->parse(scalar_function, actions_dag); if (keep_result) actions_dag->addOrReplaceInOutputs(*result_node); @@ -956,12 +940,10 @@ const ActionsDAG::Node * SerializedPlanParser::parseFunctionWithDAG( UInt32 precision = rel.scalar_function().output_type().decimal().precision(); UInt32 scale = rel.scalar_function().output_type().decimal().scale(); auto uint32_type = std::make_shared(); - new_args.emplace_back( - &actions_dag->addColumn( - ColumnWithTypeAndName(uint32_type->createColumnConst(1, precision), uint32_type, getUniqueName(toString(precision))))); - new_args.emplace_back( - &actions_dag->addColumn( - ColumnWithTypeAndName(uint32_type->createColumnConst(1, scale), uint32_type, getUniqueName(toString(scale))))); + new_args.emplace_back(&actions_dag->addColumn( + ColumnWithTypeAndName(uint32_type->createColumnConst(1, precision), uint32_type, getUniqueName(toString(precision))))); + new_args.emplace_back(&actions_dag->addColumn( + ColumnWithTypeAndName(uint32_type->createColumnConst(1, scale), uint32_type, getUniqueName(toString(scale))))); args = std::move(new_args); } else if (startsWith(function_signature, "make_decimal:")) @@ -976,12 +958,10 @@ const ActionsDAG::Node * SerializedPlanParser::parseFunctionWithDAG( UInt32 precision = rel.scalar_function().output_type().decimal().precision(); UInt32 scale = rel.scalar_function().output_type().decimal().scale(); auto uint32_type = std::make_shared(); - new_args.emplace_back( - &actions_dag->addColumn( - ColumnWithTypeAndName(uint32_type->createColumnConst(1, precision), uint32_type, getUniqueName(toString(precision))))); - new_args.emplace_back( - &actions_dag->addColumn( - ColumnWithTypeAndName(uint32_type->createColumnConst(1, scale), uint32_type, getUniqueName(toString(scale))))); + new_args.emplace_back(&actions_dag->addColumn( + ColumnWithTypeAndName(uint32_type->createColumnConst(1, precision), uint32_type, getUniqueName(toString(precision))))); + new_args.emplace_back(&actions_dag->addColumn( + ColumnWithTypeAndName(uint32_type->createColumnConst(1, scale), uint32_type, getUniqueName(toString(scale))))); args = std::move(new_args); } @@ -999,9 +979,8 @@ const ActionsDAG::Node * SerializedPlanParser::parseFunctionWithDAG( actions_dag, function_node, // as stated in isTypeMatched, currently we don't change nullability of the result type - function_node->result_type->isNullable() - ? local_engine::wrapNullableType(true, result_type)->getName() - : local_engine::removeNullable(result_type)->getName(), + function_node->result_type->isNullable() ? local_engine::wrapNullableType(true, result_type)->getName() + : local_engine::removeNullable(result_type)->getName(), function_node->result_name, CastType::accurateOrNull); } @@ -1011,9 +990,8 @@ const ActionsDAG::Node * SerializedPlanParser::parseFunctionWithDAG( actions_dag, function_node, // as stated in isTypeMatched, currently we don't change nullability of the result type - function_node->result_type->isNullable() - ? local_engine::wrapNullableType(true, result_type)->getName() - : local_engine::removeNullable(result_type)->getName(), + function_node->result_type->isNullable() ? local_engine::wrapNullableType(true, result_type)->getName() + : local_engine::removeNullable(result_type)->getName(), function_node->result_name); } } @@ -1159,9 +1137,7 @@ void SerializedPlanParser::parseFunctionArgument( } const ActionsDAG::Node * SerializedPlanParser::parseFunctionArgument( - ActionsDAGPtr & actions_dag, - const std::string & function_name, - const substrait::FunctionArgument & arg) + ActionsDAGPtr & actions_dag, const std::string & function_name, const substrait::FunctionArgument & arg) { const ActionsDAG::Node * res; if (arg.value().has_scalar_function()) @@ -1189,11 +1165,8 @@ std::pair SerializedPlanParser::convertStructFieldType(const } auto type_id = type->getTypeId(); - if (type_id == TypeIndex::UInt8 || type_id == TypeIndex::UInt16 || type_id == TypeIndex::UInt32 - || type_id == TypeIndex::UInt64) - { + if (type_id == TypeIndex::UInt8 || type_id == TypeIndex::UInt16 || type_id == TypeIndex::UInt32 || type_id == TypeIndex::UInt64) return {type, field}; - } UINT_CONVERT(type, field, Int8) UINT_CONVERT(type, field, Int16) UINT_CONVERT(type, field, Int32) @@ -1203,11 +1176,7 @@ std::pair SerializedPlanParser::convertStructFieldType(const } ActionsDAGPtr SerializedPlanParser::parseFunction( - const Block & header, - const substrait::Expression & rel, - std::string & result_name, - ActionsDAGPtr actions_dag, - bool keep_result) + const Block & header, const substrait::Expression & rel, std::string & result_name, ActionsDAGPtr actions_dag, bool keep_result) { if (!actions_dag) actions_dag = std::make_shared(blockToNameAndTypeList(header)); @@ -1217,11 +1186,7 @@ ActionsDAGPtr SerializedPlanParser::parseFunction( } ActionsDAGPtr SerializedPlanParser::parseFunctionOrExpression( - const Block & header, - const substrait::Expression & rel, - std::string & result_name, - ActionsDAGPtr actions_dag, - bool keep_result) + const Block & header, const substrait::Expression & rel, std::string & result_name, ActionsDAGPtr actions_dag, bool keep_result) { if (!actions_dag) actions_dag = std::make_shared(blockToNameAndTypeList(header)); @@ -1303,7 +1268,8 @@ ActionsDAGPtr SerializedPlanParser::parseJsonTuple( = &actions_dag->addFunction(json_extract_builder, {json_expr_node, extract_expr_node}, json_extract_result_name); auto tuple_element_builder = FunctionFactory::instance().get("sparkTupleElement", context); auto tuple_index_type = std::make_shared(); - auto add_tuple_element = [&](const ActionsDAG::Node * tuple_node, size_t i) -> const ActionsDAG::Node * { + auto add_tuple_element = [&](const ActionsDAG::Node * tuple_node, size_t i) -> const ActionsDAG::Node * + { ColumnWithTypeAndName index_col(tuple_index_type->createColumnConst(1, i), tuple_index_type, getUniqueName(std::to_string(i))); const auto * index_node = &actions_dag->addColumn(std::move(index_col)); auto result_name = "sparkTupleElement(" + tuple_node->result_name + ", " + index_node->result_name + ")"; @@ -1528,9 +1494,7 @@ std::pair SerializedPlanParser::parseLiteral(const substrait } default: { throw Exception( - ErrorCodes::UNKNOWN_TYPE, - "Unsupported spark literal type {}", - magic_enum::enum_name(literal.literal_type_case())); + ErrorCodes::UNKNOWN_TYPE, "Unsupported spark literal type {}", magic_enum::enum_name(literal.literal_type_case())); } } return std::make_pair(std::move(type), std::move(field)); @@ -1732,8 +1696,7 @@ substrait::ReadRel::ExtensionTable SerializedPlanParser::parseExtensionTable(con { substrait::ReadRel::ExtensionTable extension_table; google::protobuf::io::CodedInputStream coded_in( - reinterpret_cast(split_info.data()), - static_cast(split_info.size())); + reinterpret_cast(split_info.data()), static_cast(split_info.size())); coded_in.SetRecursionLimit(100000); auto ok = extension_table.ParseFromCodedStream(&coded_in); @@ -1779,17 +1742,17 @@ std::unique_ptr SerializedPlanParser::createExecutor(DB::QueryPla optimization_settings, BuildQueryPipelineSettings{ .actions_settings - = ExpressionActionsSettings{.can_compile_expressions = true, .min_count_to_compile_expression = 3, - .compile_expressions = CompileExpressions::yes}, + = ExpressionActionsSettings{.can_compile_expressions = true, .min_count_to_compile_expression = 3, .compile_expressions = CompileExpressions::yes}, .process_list_element = query_status}); - QueryPipeline pipeline = QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder)); + QueryPipeline pipeline = QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder)); LOG_INFO(logger, "build pipeline {} ms", stopwatch.elapsedMicroseconds() / 1000.0); - LOG_DEBUG(logger, "clickhouse plan [optimization={}]:\n{}", settings.query_plan_enable_optimizations, - PlanUtil::explainPlan(*query_plan)); + LOG_DEBUG( + logger, "clickhouse plan [optimization={}]:\n{}", settings.query_plan_enable_optimizations, PlanUtil::explainPlan(*query_plan)); LOG_DEBUG(logger, "clickhouse pipeline:\n{}", QueryPipelineUtil::explainPipeline(pipeline)); - return std::make_unique(context, std::move(query_plan), std::move(pipeline), query_plan->getCurrentDataStream().header.cloneEmpty()); + return std::make_unique( + context, std::move(query_plan), std::move(pipeline), query_plan->getCurrentDataStream().header.cloneEmpty()); } QueryPlanPtr SerializedPlanParser::parse(const std::string_view & plan) @@ -1829,8 +1792,7 @@ QueryPlanPtr SerializedPlanParser::parseJson(const std::string_view & json_plan) return parse(plan); } -SerializedPlanParser::SerializedPlanParser(const ContextPtr & context_) - : context(context_) +SerializedPlanParser::SerializedPlanParser(const ContextPtr & context_) : context(context_) { } @@ -1839,13 +1801,10 @@ ContextMutablePtr SerializedPlanParser::global_context = nullptr; Context::ConfigurationPtr SerializedPlanParser::config = nullptr; void SerializedPlanParser::collectJoinKeys( - const substrait::Expression & condition, - std::vector> & join_keys, - int32_t right_key_start) + const substrait::Expression & condition, std::vector> & join_keys, int32_t right_key_start) { auto condition_name = getFunctionName( - function_mapping.at(std::to_string(condition.scalar_function().function_reference())), - condition.scalar_function()); + function_mapping.at(std::to_string(condition.scalar_function().function_reference())), condition.scalar_function()); if (condition_name == "and") { collectJoinKeys(condition.scalar_function().arguments(0).value(), join_keys, right_key_start); @@ -1895,8 +1854,8 @@ ASTPtr ASTParser::parseToAST(const Names & names, const substrait::Expression & auto substrait_name = function_signature.substr(0, function_signature.find(':')); auto func_parser = FunctionParserFactory::instance().tryGet(substrait_name, plan_parser); - String function_name = func_parser ? func_parser->getName() - : SerializedPlanParser::getFunctionName(function_signature, scalar_function); + String function_name + = func_parser ? func_parser->getName() : SerializedPlanParser::getFunctionName(function_signature, scalar_function); ASTs ast_args; parseFunctionArgumentsToAST(names, scalar_function, ast_args); @@ -1908,9 +1867,7 @@ ASTPtr ASTParser::parseToAST(const Names & names, const substrait::Expression & } void ASTParser::parseFunctionArgumentsToAST( - const Names & names, - const substrait::Expression_ScalarFunction & scalar_function, - ASTs & ast_args) + const Names & names, const substrait::Expression_ScalarFunction & scalar_function, ASTs & ast_args) { const auto & args = scalar_function.arguments(); @@ -2053,7 +2010,8 @@ ASTPtr ASTParser::parseArgumentToAST(const Names & names, const substrait::Expre } } -void SerializedPlanParser::removeNullableForRequiredColumns(const std::set & require_columns, const ActionsDAGPtr & actions_dag) const +void SerializedPlanParser::removeNullableForRequiredColumns( + const std::set & require_columns, const ActionsDAGPtr & actions_dag) const { for (const auto & item : require_columns) { @@ -2068,9 +2026,7 @@ void SerializedPlanParser::removeNullableForRequiredColumns(const std::set & columns, - ActionsDAGPtr actions_dag, - std::map & nullable_measure_names) + const std::vector & columns, ActionsDAGPtr actions_dag, std::map & nullable_measure_names) { for (const auto & item : columns) { @@ -2214,14 +2170,15 @@ Block & LocalExecutor::getHeader() return header; } -LocalExecutor::LocalExecutor(const ContextPtr & context_, QueryPlanPtr query_plan, QueryPipeline&& pipeline, const Block & header_) +LocalExecutor::LocalExecutor(const ContextPtr & context_, QueryPlanPtr query_plan, QueryPipeline && pipeline, const Block & header_) : query_pipeline(std::move(pipeline)) , executor(std::make_unique(query_pipeline)) , header(header_) , context(context_) , ch_column_to_spark_row(std::make_unique()) , current_query_plan(std::move(query_plan)) -{} +{ +} std::string LocalExecutor::dumpPipeline() const { @@ -2247,12 +2204,8 @@ std::string LocalExecutor::dumpPipeline() const } NonNullableColumnsResolver::NonNullableColumnsResolver( - const Block & header_, - SerializedPlanParser & parser_, - const substrait::Expression & cond_rel_) - : header(header_) - , parser(parser_) - , cond_rel(cond_rel_) + const Block & header_, SerializedPlanParser & parser_, const substrait::Expression & cond_rel_) + : header(header_), parser(parser_), cond_rel(cond_rel_) { } @@ -2324,8 +2277,7 @@ void NonNullableColumnsResolver::visitNonNullable(const substrait::Expression & } std::string NonNullableColumnsResolver::safeGetFunctionName( - const std::string & function_signature, - const substrait::Expression_ScalarFunction & function) const + const std::string & function_signature, const substrait::Expression_ScalarFunction & function) const { try {