Skip to content

Commit

Permalink
support native input_file_name, input_file_block_start and input_file…
Browse files Browse the repository at this point in the history
…_block_length
  • Loading branch information
liuneng1994 committed Aug 26, 2024
1 parent f0848ad commit 0e9c9c7
Show file tree
Hide file tree
Showing 15 changed files with 397 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ class GlutenClickHouseDeltaParquetWriteSuite

val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile])
assert(addFiles.size == 4)
assert(addFiles.size == 6)
}

val sql2 =
Expand Down Expand Up @@ -420,7 +420,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
val parquetScan = scanExec.head
val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile])
assert(addFiles.size == 4)
assert(addFiles.size == 6)
}

{
Expand Down Expand Up @@ -985,7 +985,7 @@ class GlutenClickHouseDeltaParquetWriteSuite

val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile])
assert(addFiles.size == 4)
assert(addFiles.size == 6)
}

val clickhouseTable = DeltaTable.forPath(spark, dataPath)
Expand All @@ -1007,7 +1007,7 @@ class GlutenClickHouseDeltaParquetWriteSuite

val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile])
assert(addFiles.size == 3)
assert(addFiles.size == 6)
}

val df = spark.read
Expand Down Expand Up @@ -1042,7 +1042,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
val parquetScan = scanExec.head
val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile])
assert(addFiles.size == 4)
assert(addFiles.size == 6)

val clickhouseTable = DeltaTable.forPath(spark, dataPath)
clickhouseTable.delete("mod(l_orderkey, 3) = 2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,28 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
)
}
}

test("function_input_file_expr") {
withTable("test_table") {
sql("create table test_table(a int) using parquet")
sql("insert into test_table values(1)")
compareResultsAgainstVanillaSpark(
"""
|select a,input_file_name(), input_file_block_start(),
|input_file_block_length() from test_table
|""".stripMargin,
true,
{ _ => }
)
compareResultsAgainstVanillaSpark(
"""
|select input_file_name(), input_file_block_start(),
|input_file_block_length() from test_table
|""".stripMargin,
true,
{ _ => }
)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
case scanExec: BasicScanExecTransformer => scanExec
}
assert(plans.size == 1)
// 1 block keep in SubstraitFileStep, and 5 blocks keep in other steps
assert(plans.head.metrics("numOutputRows").value === 6 * parquetMaxBlockSize)
// the value is different from multiple versions of spark
assert(plans.head.metrics("numOutputRows").value % parquetMaxBlockSize == 0)
assert(plans.head.metrics("outputVectors").value === 1)
assert(plans.head.metrics("outputBytes").value > 0)
}
Expand Down
224 changes: 224 additions & 0 deletions cpp-ch/local-engine/Parser/InputFileNameParser.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "InputFileNameParser.h"

#include <iostream>
#include <ranges>

#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/QueryPipelineBuilder.h>

namespace local_engine {

static DB::ITransformingStep::Traits getTraits()
{
return DB::ITransformingStep::Traits
{
{
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = true,
}
};
}

static DB::Block getOutputHeader(const DB::DataStream& input_stream,
const std::optional<String>& file_name,
const std::optional<Int64>& block_start,
const std::optional<Int64>& block_length)
{
DB::Block output_header = input_stream.header;
if (file_name.has_value())
{
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeString>(), InputFileNameParser::INPUT_FILE_NAME});
}
if (block_start.has_value())
{
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeInt64>(), InputFileNameParser::INPUT_FILE_BLOCK_START});
}
if (block_length.has_value())
{
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeInt64>(), InputFileNameParser::INPUT_FILE_BLOCK_LENGTH});
}
return output_header;
}

class InputFileExprProjectTransform : public DB::ISimpleTransform
{
public:
InputFileExprProjectTransform(
const DB::Block& input_header_,
const DB::Block& output_header_,
const std::optional<String>& file_name,
const std::optional<Int64>& block_start,
const std::optional<Int64>& block_length)
: ISimpleTransform(input_header_, output_header_, true)
, file_name(file_name)
, block_start(block_start)
, block_length(block_length)
{
}

String getName() const override { return "InputFileExprProjectTransform"; }
void transform(DB::Chunk & chunk) override
{
InputFileNameParser::addInputFileColumnsToChunk(output.getHeader(), chunk, file_name, block_start, block_length);
}

private:
std::optional<String> file_name;
std::optional<Int64> block_start;
std::optional<Int64> block_length;
};

class InputFileExprProjectStep : public DB::ITransformingStep
{
public:
InputFileExprProjectStep(
const DB::DataStream& input_stream,
const std::optional<String>& file_name,
const std::optional<Int64>& block_start,
const std::optional<Int64>& block_length)
: ITransformingStep(input_stream, getOutputHeader(input_stream, file_name, block_start, block_length), getTraits(), true)
, file_name(file_name)
, block_start(block_start)
, block_length(block_length)
{
}

String getName() const override { return "InputFileExprProjectStep"; }
void transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings & /*settings*/) override
{
pipeline.addSimpleTransform(
[&](const DB::Block & header)
{
return std::make_shared<InputFileExprProjectTransform>(header, output_stream->header, file_name, block_start, block_length);
});
}

protected:
void updateOutputStream() override
{
output_stream = createOutputStream(input_streams.front(), output_stream->header, getDataStreamTraits());
}

private:
std::optional<String> file_name;
std::optional<Int64> block_start;
std::optional<Int64> block_length;
};

bool InputFileNameParser::hasInputFileNameColumn(const DB::Block & block)
{
auto names = block.getNames();
return std::find(names.begin(), names.end(), INPUT_FILE_NAME) != names.end();
}

bool InputFileNameParser::hasInputFileBlockStartColumn(const DB::Block & block)
{
auto names = block.getNames();
return std::find(names.begin(), names.end(), INPUT_FILE_BLOCK_START) != names.end();
}

bool InputFileNameParser::hasInputFileBlockLengthColumn(const DB::Block & block)
{
auto names = block.getNames();
return std::find(names.begin(), names.end(), INPUT_FILE_BLOCK_LENGTH) != names.end();
}

void InputFileNameParser::addInputFileColumnsToChunk(
const DB::Block & header,
DB::Chunk & chunk,
const std::optional<String> & file_name,
const std::optional<Int64> & block_start,
const std::optional<Int64> & block_length)
{
auto output_columns = chunk.getColumns();
for (size_t i = 0; i < header.columns(); ++i)
{
const auto & column = header.getByPosition(i);
if (column.name == INPUT_FILE_NAME)
{
if (!file_name.has_value())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Input file name is not set");
auto type_string = std::make_shared<DB::DataTypeString>();
auto file_name_column = type_string->createColumn();
file_name_column->insertMany(file_name.value(), chunk.getNumRows());
output_columns.insert(output_columns.begin() + i, std::move(file_name_column));
}
else if (column.name == INPUT_FILE_BLOCK_START)
{
if (!block_start.has_value())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "block_start is not set");
auto type_int64 = std::make_shared<DB::DataTypeInt64>();
auto block_start_column = type_int64->createColumn();
block_start_column->insertMany(block_start.value(), chunk.getNumRows());
output_columns.insert(output_columns.begin() + i, std::move(block_start_column));
}
else if (column.name == INPUT_FILE_BLOCK_LENGTH)
{
if (!block_length.has_value())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "block_length is not set");
auto type_int64 = std::make_shared<DB::DataTypeInt64>();
auto block_length_column = type_int64->createColumn();
block_length_column->insertMany(block_length.value(), chunk.getNumRows());
output_columns.insert(output_columns.begin() + i, std::move(block_length_column));
}
}
chunk.setColumns(output_columns, chunk.getNumRows());
}

bool InputFileNameParser::containsInputFileColumns(const DB::Block & block)
{
return hasInputFileNameColumn(block) || hasInputFileBlockStartColumn(block) || hasInputFileBlockLengthColumn(block);
}

DB::Block InputFileNameParser::removeInputFileColumn(const DB::Block & block)
{
const auto & columns = block.getColumnsWithTypeAndName();
DB::ColumnsWithTypeAndName result_columns;
for (const auto & column : columns) {
if (!INPUT_FILE_COLUMNS_SET.contains(column.name))
{
result_columns.push_back(column);
}
}
return result_columns;
}

void InputFileNameParser::addInputFileProjectStep(DB::QueryPlan & plan)
{
if (!file_name.has_value() && !block_start.has_value() && !block_length.has_value()) return;
auto step = std::make_unique<InputFileExprProjectStep>(plan.getCurrentDataStream(), file_name, block_start, block_length);
step->setStepDescription("Input file expression project");
plan.addStep(std::move(step));
}

void InputFileNameParser::addInputFileColumnsToChunk(const DB::Block & header, DB::Chunk & chunk)
{
addInputFileColumnsToChunk(header, chunk, file_name, block_start, block_length);
}


}
70 changes: 70 additions & 0 deletions cpp-ch/local-engine/Parser/InputFileNameParser.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <Processors/QueryPlan/ExpressionStep.h>

namespace DB
{
class Chunk;
}

namespace local_engine
{
class InputFileNameParser
{
public:
static inline const String& INPUT_FILE_NAME = "input_file_name";
static inline const String& INPUT_FILE_BLOCK_START = "input_file_block_start";
static inline const String& INPUT_FILE_BLOCK_LENGTH = "input_file_block_length";
static inline std::unordered_set INPUT_FILE_COLUMNS_SET = {
INPUT_FILE_NAME, INPUT_FILE_BLOCK_START, INPUT_FILE_BLOCK_LENGTH
};

static bool hasInputFileNameColumn(const DB::Block& block);
static bool hasInputFileBlockStartColumn(const DB::Block& block);
static bool hasInputFileBlockLengthColumn(const DB::Block& block);
static bool containsInputFileColumns(const DB::Block& block);
static DB::Block removeInputFileColumn(const DB::Block& block);
static void addInputFileColumnsToChunk(const DB::Block& header, DB::Chunk& chunk,
const std::optional<String>& file_name,
const std::optional<Int64>& block_start,
const std::optional<Int64>& block_length);


void setFileName(const String& file_name)
{
this->file_name = file_name;
}

void setBlockStart(const Int64 block_start)
{
this->block_start = block_start;
}

void setBlockLength(const Int64 block_length)
{
this->block_length = block_length;
}

void addInputFileProjectStep(DB::QueryPlan& plan);
void addInputFileColumnsToChunk(const DB::Block & header, DB::Chunk & chunk);
private:
std::optional<String> file_name;
std::optional<Int64> block_start;
std::optional<Int64> block_length;
};
} // local_engine
5 changes: 0 additions & 5 deletions cpp-ch/local-engine/Parser/LocalExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ bool LocalExecutor::hasNext()

bool LocalExecutor::fallbackMode()
{
if (executor.get() || fallback_mode)
std::cerr << fmt::format("executor {} in fallback mode\n", reinterpret_cast<long>(this));
else
std::cerr << fmt::format("executor {} not in fallback mode\n", reinterpret_cast<long>(this));
return executor.get() || fallback_mode;
}

Expand All @@ -92,7 +88,6 @@ SparkRowInfoPtr LocalExecutor::next()
spark_buffer = std::make_unique<SparkBuffer>();
spark_buffer->address = row_info->getBufferAddress();
spark_buffer->size = row_info->getTotalBytes();
std::cerr << "call next\n";
return row_info;
}
Block * LocalExecutor::nextColumnar()
Expand Down
Loading

0 comments on commit 0e9c9c7

Please sign in to comment.