Skip to content

Commit

Permalink
rebase and fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Sep 9, 2024
1 parent 04a553c commit 0f7982b
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,4 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}
}

override def supportNativeInputFileRelatedExpr(): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ private object CHRuleApi {
def injectLegacy(injector: LegacyInjector): Unit = {
// Gluten columnar: Transform rules.
injector.injectTransform(_ => RemoveTransitions)
injector.injectTransform(_ => PushDownInputFileExpression.PreOffload)
injector.injectTransform(c => FallbackOnANSIMode.apply(c.session))
injector.injectTransform(c => FallbackMultiCodegens.apply(c.session))
injector.injectTransform(_ => RewriteSubqueryBroadcast())
Expand All @@ -72,6 +73,7 @@ private object CHRuleApi {
injector.injectTransform(_ => TransformPreOverrides())
injector.injectTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectTransform(c => RewriteTransformer.apply(c.session))
injector.injectTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectTransform(_ => EnsureLocalSortRequirements)
injector.injectTransform(_ => EliminateLocalSort)
injector.injectTransform(_ => CollapseProjectExecTransformer)
Expand Down
98 changes: 43 additions & 55 deletions cpp-ch/local-engine/Parser/InputFileNameParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,49 +17,44 @@

#include "InputFileNameParser.h"

#include <iostream>
#include <ranges>

#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/QueryPipelineBuilder.h>

namespace local_engine {

namespace local_engine
{
static DB::ITransformingStep::Traits getTraits()
{
return DB::ITransformingStep::Traits
{
{
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = true,
}
};
return DB::ITransformingStep::Traits{
{
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = true,
}};
}

static DB::Block getOutputHeader(const DB::DataStream& input_stream,
const std::optional<String>& file_name,
const std::optional<Int64>& block_start,
const std::optional<Int64>& block_length)
static DB::Block getOutputHeader(
const DB::DataStream & input_stream,
const std::optional<String> & file_name,
const std::optional<Int64> & block_start,
const std::optional<Int64> & block_length)
{
DB::Block output_header = input_stream.header;
if (file_name.has_value())
{
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeString>(), InputFileNameParser::INPUT_FILE_NAME});
}
if (block_start.has_value())
{
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeInt64>(), InputFileNameParser::INPUT_FILE_BLOCK_START});
}
if (block_length.has_value())
{
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeInt64>(), InputFileNameParser::INPUT_FILE_BLOCK_LENGTH});
output_header.insert(
DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeInt64>(), InputFileNameParser::INPUT_FILE_BLOCK_LENGTH});
}
return output_header;
}
Expand All @@ -68,22 +63,20 @@ class InputFileExprProjectTransform : public DB::ISimpleTransform
{
public:
InputFileExprProjectTransform(
const DB::Block& input_header_,
const DB::Block& output_header_,
const std::optional<String>& file_name,
const std::optional<Int64>& block_start,
const std::optional<Int64>& block_length)
: ISimpleTransform(input_header_, output_header_, true)
, file_name(file_name)
, block_start(block_start)
, block_length(block_length)
const DB::Block & input_header_,
const DB::Block & output_header_,
const std::optional<String> & file_name,
const std::optional<Int64> & block_start,
const std::optional<Int64> & block_length)
: ISimpleTransform(input_header_, output_header_, true), file_name(file_name), block_start(block_start), block_length(block_length)
{
}

String getName() const override { return "InputFileExprProjectTransform"; }

void transform(DB::Chunk & chunk) override
{
InputFileNameParser::addInputFileColumnsToChunk(output.getHeader(), chunk, file_name, block_start, block_length);
InputFileNameParser::addInputFileColumnsToChunk(output.getHeader(), chunk, file_name, block_start, block_length);
}

private:
Expand All @@ -96,10 +89,10 @@ class InputFileExprProjectStep : public DB::ITransformingStep
{
public:
InputFileExprProjectStep(
const DB::DataStream& input_stream,
const std::optional<String>& file_name,
const std::optional<Int64>& block_start,
const std::optional<Int64>& block_length)
const DB::DataStream & input_stream,
const std::optional<String> & file_name,
const std::optional<Int64> & block_start,
const std::optional<Int64> & block_length)
: ITransformingStep(input_stream, getOutputHeader(input_stream, file_name, block_start, block_length), getTraits(), true)
, file_name(file_name)
, block_start(block_start)
Expand All @@ -108,11 +101,11 @@ class InputFileExprProjectStep : public DB::ITransformingStep
}

String getName() const override { return "InputFileExprProjectStep"; }
void transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings & /*settings*/) override

void transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings & /*settings*/) override
{
pipeline.addSimpleTransform(
[&](const DB::Block & header)
{
[&](const DB::Block & header) {
return std::make_shared<InputFileExprProjectTransform>(header, output_stream->header, file_name, block_start, block_length);
});
}
Expand Down Expand Up @@ -163,26 +156,23 @@ void InputFileNameParser::addInputFileColumnsToChunk(
if (!file_name.has_value())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Input file name is not set");
auto type_string = std::make_shared<DB::DataTypeString>();
auto file_name_column = type_string->createColumn();
file_name_column->insertMany(file_name.value(), chunk.getNumRows());
auto file_name_column = type_string->createColumnConst(chunk.getNumRows(), file_name.value());
output_columns.insert(output_columns.begin() + i, std::move(file_name_column));
}
else if (column.name == INPUT_FILE_BLOCK_START)
{
if (!block_start.has_value())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "block_start is not set");
auto type_int64 = std::make_shared<DB::DataTypeInt64>();
auto block_start_column = type_int64->createColumn();
block_start_column->insertMany(block_start.value(), chunk.getNumRows());
auto block_start_column = type_int64->createColumnConst(chunk.getNumRows(), block_start.value());
output_columns.insert(output_columns.begin() + i, std::move(block_start_column));
}
else if (column.name == INPUT_FILE_BLOCK_LENGTH)
{
if (!block_length.has_value())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "block_length is not set");
auto type_int64 = std::make_shared<DB::DataTypeInt64>();
auto block_length_column = type_int64->createColumn();
block_length_column->insertMany(block_length.value(), chunk.getNumRows());
auto block_length_column = type_int64->createColumnConst(chunk.getNumRows(), block_length.value());
output_columns.insert(output_columns.begin() + i, std::move(block_length_column));
}
}
Expand All @@ -198,27 +188,25 @@ DB::Block InputFileNameParser::removeInputFileColumn(const DB::Block & block)
{
const auto & columns = block.getColumnsWithTypeAndName();
DB::ColumnsWithTypeAndName result_columns;
for (const auto & column : columns) {
for (const auto & column : columns)
if (!INPUT_FILE_COLUMNS_SET.contains(column.name))
{
result_columns.push_back(column);
}
}
return result_columns;
}

void InputFileNameParser::addInputFileProjectStep(DB::QueryPlan & plan)
std::optional<DB::IQueryPlanStep *> InputFileNameParser::addInputFileProjectStep(DB::QueryPlan & plan)
{
if (!file_name.has_value() && !block_start.has_value() && !block_length.has_value()) return;
if (!file_name.has_value() && !block_start.has_value() && !block_length.has_value())
return std::nullopt;
auto step = std::make_unique<InputFileExprProjectStep>(plan.getCurrentDataStream(), file_name, block_start, block_length);
step->setStepDescription("Input file expression project");
std::optional<DB::IQueryPlanStep *> result = step.get();
plan.addStep(std::move(step));
return result;
}

void InputFileNameParser::addInputFileColumnsToChunk(const DB::Block & header, DB::Chunk & chunk)
{
addInputFileColumnsToChunk(header, chunk, file_name, block_start, block_length);
}


}
78 changes: 35 additions & 43 deletions cpp-ch/local-engine/Parser/InputFileNameParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,47 +24,39 @@ namespace DB

namespace local_engine
{
class InputFileNameParser
{
public:
static inline const String& INPUT_FILE_NAME = "input_file_name";
static inline const String& INPUT_FILE_BLOCK_START = "input_file_block_start";
static inline const String& INPUT_FILE_BLOCK_LENGTH = "input_file_block_length";
static inline std::unordered_set INPUT_FILE_COLUMNS_SET = {
INPUT_FILE_NAME, INPUT_FILE_BLOCK_START, INPUT_FILE_BLOCK_LENGTH
};

static bool hasInputFileNameColumn(const DB::Block& block);
static bool hasInputFileBlockStartColumn(const DB::Block& block);
static bool hasInputFileBlockLengthColumn(const DB::Block& block);
static bool containsInputFileColumns(const DB::Block& block);
static DB::Block removeInputFileColumn(const DB::Block& block);
static void addInputFileColumnsToChunk(const DB::Block& header, DB::Chunk& chunk,
const std::optional<String>& file_name,
const std::optional<Int64>& block_start,
const std::optional<Int64>& block_length);


void setFileName(const String& file_name)
{
this->file_name = file_name;
}

void setBlockStart(const Int64 block_start)
{
this->block_start = block_start;
}

void setBlockLength(const Int64 block_length)
{
this->block_length = block_length;
}

void addInputFileProjectStep(DB::QueryPlan& plan);
void addInputFileColumnsToChunk(const DB::Block & header, DB::Chunk & chunk);
private:
std::optional<String> file_name;
std::optional<Int64> block_start;
std::optional<Int64> block_length;
};
class InputFileNameParser
{
public:
static inline const String & INPUT_FILE_NAME = "input_file_name";
static inline const String & INPUT_FILE_BLOCK_START = "input_file_block_start";
static inline const String & INPUT_FILE_BLOCK_LENGTH = "input_file_block_length";
static inline std::unordered_set INPUT_FILE_COLUMNS_SET = {INPUT_FILE_NAME, INPUT_FILE_BLOCK_START, INPUT_FILE_BLOCK_LENGTH};

static bool hasInputFileNameColumn(const DB::Block & block);
static bool hasInputFileBlockStartColumn(const DB::Block & block);
static bool hasInputFileBlockLengthColumn(const DB::Block & block);
static bool containsInputFileColumns(const DB::Block & block);
static DB::Block removeInputFileColumn(const DB::Block & block);
static void addInputFileColumnsToChunk(
const DB::Block & header,
DB::Chunk & chunk,
const std::optional<String> & file_name,
const std::optional<Int64> & block_start,
const std::optional<Int64> & block_length);


void setFileName(const String & file_name) { this->file_name = file_name; }

void setBlockStart(const Int64 block_start) { this->block_start = block_start; }

void setBlockLength(const Int64 block_length) { this->block_length = block_length; }

[[nodiscard]] std::optional<DB::IQueryPlanStep *> addInputFileProjectStep(DB::QueryPlan & plan);
void addInputFileColumnsToChunk(const DB::Block & header, DB::Chunk & chunk);

private:
std::optional<String> file_name;
std::optional<Int64> block_start;
std::optional<Int64> block_length;
};
} // local_engine
10 changes: 4 additions & 6 deletions cpp-ch/local-engine/Parser/LocalExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

#include "LocalExecutor.h"

#include <Common/QueryContext.h>
#include <QueryPipeline/printPipeline.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Core/Settings.h>
#include <Parser/SerializedPlanParser.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <QueryPipeline/printPipeline.h>
#include <Common/QueryContext.h>

using namespace DB;
namespace local_engine
Expand Down Expand Up @@ -123,7 +123,7 @@ void LocalExecutor::execute()
{
chassert(query_pipeline_builder);
push_executor = query_pipeline_builder->execute();
push_executor->execute(local_engine::QueryContextManager::instance().currentQueryContext()->getSettingsRef().max_threads, false);
push_executor->execute(local_engine::QueryContext::instance().currentQueryContext()->getSettingsRef().max_threads, false);
}

Block LocalExecutor::getHeader()
Expand All @@ -139,9 +139,7 @@ LocalExecutor::LocalExecutor(QueryPlanPtr query_plan, QueryPipelineBuilderPtr pi
, current_query_plan(std::move(query_plan))
{
if (current_executor)
{
fallback_mode = true;
}
// only need record last executor
current_executor = this;
}
Expand Down
12 changes: 6 additions & 6 deletions cpp-ch/local-engine/Parser/LocalExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/

#pragma once
#include <Common/BlockIterator.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Parser/CHColumnToSparkRow.h>
#include <Parser/RelMetric.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/BlockIterator.h>

namespace local_engine
{
Expand Down Expand Up @@ -56,6 +56,8 @@ class LocalExecutor : public BlockIterator
void setExtraPlanHolder(std::vector<DB::QueryPlanPtr> & extra_plan_holder_) { extra_plan_holder = std::move(extra_plan_holder_); }

private:
// In the case of fallback, there may be multiple native pipelines in one stage. Can determine whether a fallback has occurred by whether a LocalExecutor already exists.
// Updated when the LocalExecutor is created and reset when the task ends
static thread_local LocalExecutor * current_executor;
std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(const DB::Block & block) const;
void initPullingPipelineExecutor();
Expand All @@ -78,5 +80,3 @@ class LocalExecutor : public BlockIterator
bool fallback_mode = false;
};
}


5 changes: 3 additions & 2 deletions cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <Parser/TypeParser.h>
#include <Storages/MergeTree/StorageMergeTreeFactory.h>
#include <google/protobuf/wrappers.pb.h>
#include <Storages/Mergetree/MetaDataHelper.h>
#include <Poco/StringTokenizer.h>
#include <Common/CHUtil.h>
#include <Parser/InputFileNameParser.h>
Expand Down Expand Up @@ -154,7 +153,9 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
if (remove_null_step)
steps.emplace_back(remove_null_step);
}
input_file_name_parser.addInputFileProjectStep(*query_plan);
auto step = input_file_name_parser.addInputFileProjectStep(*query_plan);
if (step.has_value())
steps.emplace_back(step.value());
return query_plan;
}

Expand Down
Loading

0 comments on commit 0f7982b

Please sign in to comment.