Skip to content

Commit

Permalink
createExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Jun 24, 2024
1 parent 65fbf31 commit bf04296
Show file tree
Hide file tree
Showing 13 changed files with 921 additions and 500 deletions.
17 changes: 6 additions & 11 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_TYPE;
extern const int CANNOT_PARSE_PROTOBUF_SCHEMA;
}
}

Expand Down Expand Up @@ -466,17 +467,17 @@ String QueryPipelineUtil::explainPipeline(DB::QueryPipeline & pipeline)

using namespace DB;

std::map<std::string, std::string> BackendInitializerUtil::getBackendConfMap(std::string * plan)
std::map<std::string, std::string> BackendInitializerUtil::getBackendConfMap(const std::string & plan)
{
std::map<std::string, std::string> ch_backend_conf;
if (plan == nullptr)
if (plan.empty())
return ch_backend_conf;

/// Parse backend configs from plan extensions
do
{
auto plan_ptr = std::make_unique<substrait::Plan>();
auto success = plan_ptr->ParseFromString(*plan);
auto success = plan_ptr->ParseFromString(plan);
if (!success)
break;

Expand Down Expand Up @@ -841,14 +842,8 @@ void BackendInitializerUtil::initCompiledExpressionCache(DB::Context::Configurat
#endif
}

void BackendInitializerUtil::init_json(std::string * plan_json)
{
auto plan_ptr = std::make_unique<substrait::Plan>();
google::protobuf::util::JsonStringToMessage(plan_json->c_str(), plan_ptr.get());
return init(new String(plan_ptr->SerializeAsString()));
}

void BackendInitializerUtil::init(std::string * plan)
void BackendInitializerUtil::init(const std::string & plan)
{
std::map<std::string, std::string> backend_conf_map = getBackendConfMap(plan);
DB::Context::ConfigurationPtr config = initConfig(backend_conf_map);
Expand Down Expand Up @@ -906,7 +901,7 @@ void BackendInitializerUtil::init(std::string * plan)
});
}

void BackendInitializerUtil::updateConfig(const DB::ContextMutablePtr & context, std::string * plan)
void BackendInitializerUtil::updateConfig(const DB::ContextMutablePtr & context, const std::string & plan)
{
std::map<std::string, std::string> backend_conf_map = getBackendConfMap(plan);

Expand Down
12 changes: 4 additions & 8 deletions cpp-ch/local-engine/Common/CHUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,8 @@ class BackendInitializerUtil
/// Initialize two kinds of resources
/// 1. global level resources like global_context/shared_context, notice that they can only be initialized once in process lifetime
/// 2. session level resources like settings/configs, they can be initialized multiple times following the lifetime of executor/driver
static void init(std::string * plan);
static void init_json(std::string * plan_json);
static void updateConfig(const DB::ContextMutablePtr &, std::string *);
static void init(const std::string & plan);
static void updateConfig(const DB::ContextMutablePtr &, const std::string &);


// use excel text parser
Expand Down Expand Up @@ -196,7 +195,7 @@ class BackendInitializerUtil
static void updateNewSettings(const DB::ContextMutablePtr &, const DB::Settings &);


static std::map<std::string, std::string> getBackendConfMap(std::string * plan);
static std::map<std::string, std::string> getBackendConfMap(const std::string & plan);

inline static std::once_flag init_flag;
inline static Poco::Logger * logger;
Expand Down Expand Up @@ -283,10 +282,7 @@ class ConcurrentDeque
return deq.empty();
}

std::deque<T> unsafeGet()
{
return deq;
}
std::deque<T> unsafeGet() { return deq; }

private:
std::deque<T> deq;
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/CHColumnToSparkRow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ std::unique_ptr<SparkRowInfo> CHColumnToSparkRow::convertCHColumnToSparkRow(cons
if (!block.columns())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "A block with empty columns");
std::unique_ptr<SparkRowInfo> spark_row_info = std::make_unique<SparkRowInfo>(block, masks);
spark_row_info->setBufferAddress(reinterpret_cast<char *>(alloc(spark_row_info->getTotalBytes(), 64)));
spark_row_info->setBufferAddress(static_cast<char *>(alloc(spark_row_info->getTotalBytes(), 64)));
// spark_row_info->setBufferAddress(alignedAlloc(spark_row_info->getTotalBytes(), 64));
memset(spark_row_info->getBufferAddress(), 0, spark_row_info->getTotalBytes());
for (auto col_idx = 0; col_idx < spark_row_info->getNumCols(); col_idx++)
Expand Down
166 changes: 69 additions & 97 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,13 +380,13 @@ DataTypePtr wrapNullableType(bool nullable, DataTypePtr nested_type)
return nested_type;
}

QueryPlanPtr SerializedPlanParser::parse(std::unique_ptr<substrait::Plan> plan)
QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan& plan)
{
logDebugMessage(*plan, "substrait plan");
parseExtensions(plan->extensions());
if (plan->relations_size() == 1)
logDebugMessage(plan, "substrait plan");
parseExtensions(plan.extensions());
if (plan.relations_size() == 1)
{
auto root_rel = plan->relations().at(0);
auto root_rel = plan.relations().at(0);
if (!root_rel.has_root())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "must have root rel!");
Expand Down Expand Up @@ -1747,8 +1747,7 @@ substrait::ReadRel::LocalFiles SerializedPlanParser::parseLocalFiles(const std::
{
substrait::ReadRel::LocalFiles local_files;
google::protobuf::io::CodedInputStream coded_in(
reinterpret_cast<const uint8_t *>(split_info.data()),
static_cast<int>(split_info.size()));
reinterpret_cast<const uint8_t *>(split_info.data()), static_cast<int>(split_info.size()));
coded_in.SetRecursionLimit(100000);

auto ok = local_files.ParseFromCodedStream(&coded_in);
Expand All @@ -1758,22 +1757,55 @@ substrait::ReadRel::LocalFiles SerializedPlanParser::parseLocalFiles(const std::
return local_files;
}

std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(DB::QueryPlanPtr query_plan)
{
Stopwatch stopwatch;
auto * logger = &Poco::Logger::get("SerializedPlanParser");
const Settings & settings = context->getSettingsRef();

QueryPriorities priorities;
auto query_status = std::make_shared<QueryStatus>(
context,
"",
context->getClientInfo(),
priorities.insert(static_cast<int>(settings.priority)),
CurrentThread::getGroup(),
IAST::QueryKind::Select,
settings,
0);

QueryPlanOptimizationSettings optimization_settings{.optimize_plan = settings.query_plan_enable_optimizations};
auto pipeline_builder = query_plan->buildQueryPipeline(
optimization_settings,
BuildQueryPipelineSettings{
.actions_settings
= ExpressionActionsSettings{.can_compile_expressions = true, .min_count_to_compile_expression = 3,
.compile_expressions = CompileExpressions::yes},
.process_list_element = query_status});
QueryPipeline pipeline = QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder));
LOG_INFO(logger, "build pipeline {} ms", stopwatch.elapsedMicroseconds() / 1000.0);

LOG_DEBUG(logger, "clickhouse plan [optimization={}]:\n{}", settings.query_plan_enable_optimizations,
PlanUtil::explainPlan(*query_plan));
LOG_DEBUG(logger, "clickhouse pipeline:\n{}", QueryPipelineUtil::explainPipeline(pipeline));

return std::make_unique<LocalExecutor>(context, std::move(query_plan), std::move(pipeline), query_plan->getCurrentDataStream().header.cloneEmpty());
}

QueryPlanPtr SerializedPlanParser::parse(const std::string & plan)
QueryPlanPtr SerializedPlanParser::parse(const std::string_view & plan)
{
auto plan_ptr = std::make_unique<substrait::Plan>();
substrait::Plan s_plan;
/// https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
/// Parsing may fail when the number of recursive layers is large.
/// Here, set a limit large enough to avoid this problem.
/// Once this problem occurs, it is difficult to troubleshoot, because the pb of c++ will not provide any valid information
google::protobuf::io::CodedInputStream coded_in(reinterpret_cast<const uint8_t *>(plan.data()), static_cast<int>(plan.size()));
coded_in.SetRecursionLimit(100000);

auto ok = plan_ptr->ParseFromCodedStream(&coded_in);
if (!ok)
if (!s_plan.ParseFromCodedStream(&coded_in))
throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Plan from string failed");

auto res = parse(std::move(plan_ptr));
auto res = parse(s_plan);

#ifndef NDEBUG
PlanUtil::checkOuputType(*res);
Expand All @@ -1788,13 +1820,13 @@ QueryPlanPtr SerializedPlanParser::parse(const std::string & plan)
return res;
}

QueryPlanPtr SerializedPlanParser::parseJson(const std::string & json_plan)
QueryPlanPtr SerializedPlanParser::parseJson(const std::string_view & json_plan)
{
auto plan_ptr = std::make_unique<substrait::Plan>();
auto s = google::protobuf::util::JsonStringToMessage(absl::string_view(json_plan), plan_ptr.get());
substrait::Plan plan;
auto s = google::protobuf::util::JsonStringToMessage(json_plan, &plan);
if (!s.ok())
throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Plan from json string failed: {}", s.ToString());
return parse(std::move(plan_ptr));
return parse(plan);
}

SerializedPlanParser::SerializedPlanParser(const ContextPtr & context_)
Expand Down Expand Up @@ -2021,12 +2053,11 @@ ASTPtr ASTParser::parseArgumentToAST(const Names & names, const substrait::Expre
}
}

void SerializedPlanParser::removeNullableForRequiredColumns(const std::set<String> & require_columns, ActionsDAGPtr actions_dag)
void SerializedPlanParser::removeNullableForRequiredColumns(const std::set<String> & require_columns, const ActionsDAGPtr & actions_dag) const
{
for (const auto & item : require_columns)
{
const auto * require_node = actions_dag->tryFindInOutputs(item);
if (require_node)
if (const auto * require_node = actions_dag->tryFindInOutputs(item))
{
auto function_builder = FunctionFactory::instance().get("assumeNotNull", context);
ActionsDAG::NodeRawConstPtrs args = {require_node};
Expand Down Expand Up @@ -2092,86 +2123,23 @@ LocalExecutor::~LocalExecutor()
}
}


void LocalExecutor::execute(QueryPlanPtr query_plan)
{
Stopwatch stopwatch;

const Settings & settings = context->getSettingsRef();
current_query_plan = std::move(query_plan);
auto * logger = &Poco::Logger::get("LocalExecutor");

QueryPriorities priorities;
auto query_status = std::make_shared<QueryStatus>(
context,
"",
context->getClientInfo(),
priorities.insert(static_cast<int>(settings.priority)),
CurrentThread::getGroup(),
IAST::QueryKind::Select,
settings,
0);

QueryPlanOptimizationSettings optimization_settings{.optimize_plan = settings.query_plan_enable_optimizations};
auto pipeline_builder = current_query_plan->buildQueryPipeline(
optimization_settings,
BuildQueryPipelineSettings{
.actions_settings
= ExpressionActionsSettings{.can_compile_expressions = true, .min_count_to_compile_expression = 3,
.compile_expressions = CompileExpressions::yes},
.process_list_element = query_status});

LOG_DEBUG(logger, "clickhouse plan after optimization:\n{}", PlanUtil::explainPlan(*current_query_plan));
query_pipeline = QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder));
LOG_DEBUG(logger, "clickhouse pipeline:\n{}", QueryPipelineUtil::explainPipeline(query_pipeline));
auto t_pipeline = stopwatch.elapsedMicroseconds();

executor = std::make_unique<PullingPipelineExecutor>(query_pipeline);
auto t_executor = stopwatch.elapsedMicroseconds() - t_pipeline;
stopwatch.stop();
LOG_INFO(
logger,
"build pipeline {} ms; create executor {} ms;",
t_pipeline / 1000.0,
t_executor / 1000.0);

header = current_query_plan->getCurrentDataStream().header.cloneEmpty();
ch_column_to_spark_row = std::make_unique<CHColumnToSparkRow>();
}

std::unique_ptr<SparkRowInfo> LocalExecutor::writeBlockToSparkRow(Block & block)
std::unique_ptr<SparkRowInfo> LocalExecutor::writeBlockToSparkRow(const Block & block) const
{
return ch_column_to_spark_row->convertCHColumnToSparkRow(block);
}

bool LocalExecutor::hasNext()
{
bool has_next;
try
{
size_t columns = currentBlock().columns();
if (columns == 0 || isConsumed())
{
auto empty_block = header.cloneEmpty();
setCurrentBlock(empty_block);
has_next = executor->pull(currentBlock());
produce();
}
else
{
has_next = true;
}
}
catch (Exception & e)
size_t columns = currentBlock().columns();
if (columns == 0 || isConsumed())
{
LOG_ERROR(
&Poco::Logger::get("LocalExecutor"),
"LocalExecutor run query plan failed with message: {}. Plan Explained: \n{}",
e.message(),
PlanUtil::explainPlan(*current_query_plan));
throw;
auto empty_block = header.cloneEmpty();
setCurrentBlock(empty_block);
bool has_next = executor->pull(currentBlock());
produce();
return has_next;
}
return has_next;
return true;
}

SparkRowInfoPtr LocalExecutor::next()
Expand Down Expand Up @@ -2246,12 +2214,16 @@ Block & LocalExecutor::getHeader()
return header;
}

LocalExecutor::LocalExecutor(ContextPtr context_)
: context(context_)
{
}
LocalExecutor::LocalExecutor(const ContextPtr & context_, QueryPlanPtr query_plan, QueryPipeline&& pipeline, const Block & header_)
: query_pipeline(std::move(pipeline))
, executor(std::make_unique<PullingPipelineExecutor>(query_pipeline))
, header(header_)
, context(context_)
, ch_column_to_spark_row(std::make_unique<CHColumnToSparkRow>())
, current_query_plan(std::move(query_plan))
{}

std::string LocalExecutor::dumpPipeline()
std::string LocalExecutor::dumpPipeline() const
{
const auto & processors = query_pipeline.getProcessors();
for (auto & processor : processors)
Expand Down Expand Up @@ -2353,7 +2325,7 @@ void NonNullableColumnsResolver::visitNonNullable(const substrait::Expression &

std::string NonNullableColumnsResolver::safeGetFunctionName(
const std::string & function_signature,
const substrait::Expression_ScalarFunction & function)
const substrait::Expression_ScalarFunction & function) const
{
try
{
Expand Down
Loading

0 comments on commit bf04296

Please sign in to comment.