Skip to content

Commit

Permalink
[GLUTEN-8080][CH]Support function transform_keys/transform_values (#8085
Browse files Browse the repository at this point in the history
)

* support function transfrom_keys/transform_values

* fix velox failed ut

* revert all files

* support transform_keys/values

* add uts
  • Loading branch information
taiyang-li authored Dec 2, 2024
1 parent 6d7824e commit d75e90f
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@ object CHExpressionUtil {
TO_UTC_TIMESTAMP -> UtcTimestampValidator(),
FROM_UTC_TIMESTAMP -> UtcTimestampValidator(),
STACK -> DefaultValidator(),
TRANSFORM_KEYS -> DefaultValidator(),
TRANSFORM_VALUES -> DefaultValidator(),
RAISE_ERROR -> DefaultValidator(),
WIDTH_BUCKET -> DefaultValidator()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,4 +860,16 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS
val sql = "select cast(id % 2 = 1 as string) from range(10)"
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}

test("Test transform_keys/transform_values") {
val sql = """
|select
| transform_keys(map_from_arrays(array(id+1, id+2, id+3),
| array(1, id+2, 3)), (k, v) -> k + 1),
| transform_values(map_from_arrays(array(id+1, id+2, id+3),
| array(1, id+2, 3)), (k, v) -> v + 1)
|from range(10)
|""".stripMargin
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
}
2 changes: 0 additions & 2 deletions cpp-ch/local-engine/Parser/FunctionParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ FunctionParserPtr FunctionParserFactory::get(const String & name, ParserContextP
{
auto res = tryGet(name, ctx);
if (!res)
{
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unknown function parser {}", name);
}

return res;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeFunction.h>
#include <DataTypes/DataTypeNullable.h>
#include <Parser/FunctionParser.h>
#include <Parser/TypeParser.h>
#include <Parser/scalar_function_parser/lambdaFunction.h>
#include <Common/BlockTypeUtils.h>
#include <Common/CHUtil.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include "DataTypes/DataTypeMap.h"

namespace DB::ErrorCodes
{
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}

namespace local_engine
{

template <bool transform_keys = true>
class FunctionParserMapTransformImpl : public FunctionParser
{
public:
static constexpr auto name = transform_keys ? "transform_keys" : "transform_values";
String getName() const override { return name; }

explicit FunctionParserMapTransformImpl(ParserContextPtr parser_context_) : FunctionParser(parser_context_) {}
~FunctionParserMapTransformImpl() override = default;

const DB::ActionsDAG::Node *
parse(const substrait::Expression_ScalarFunction & substrait_func, DB::ActionsDAG & actions_dag) const override
{
/// Parse spark transform_keys(map, func) as CH mapFromArrays(arrayMap(func, cast(map as array)), mapValues(map))
/// Parse spark transform_values(map, func) as CH mapFromArrays(mapKeys(map), arrayMap(func, cast(map as array)))
auto parsed_args = parseFunctionArguments(substrait_func, actions_dag);
if (parsed_args.size() != 2)
throw DB::Exception(DB::ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "{} function must have three arguments", getName());

auto lambda_args = collectLambdaArguments(parser_context, substrait_func.arguments()[1].value().scalar_function());
if (lambda_args.size() != 2)
throw DB::Exception(
DB::ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH, "The lambda function in {} must have two arguments", getName());

const auto * map_node = parsed_args[0];
const auto * func_node = parsed_args[1];
const auto & map_type = map_node->result_type;
auto array_type = checkAndGetDataType<DataTypeMap>(removeNullable(map_type).get())->getNestedType();
if (map_type->isNullable())
array_type = std::make_shared<DataTypeNullable>(array_type);
const auto * array_node = ActionsDAGUtil::convertNodeTypeIfNeeded(actions_dag, map_node, array_type);
const auto * transformed_node = toFunctionNode(actions_dag, "arrayMap", {func_node, array_node});

const DB::ActionsDAG::Node * result_node = nullptr;
if constexpr (transform_keys)
{
const auto * nontransformed_node = toFunctionNode(actions_dag, "mapValues", {parsed_args[0]});
result_node = toFunctionNode(actions_dag, "mapFromArrays", {transformed_node, nontransformed_node});
}
else
{
const auto * nontransformed_node = toFunctionNode(actions_dag, "mapKeys", {parsed_args[0]});
result_node = toFunctionNode(actions_dag, "mapFromArrays", {nontransformed_node, transformed_node});
}
return convertNodeTypeIfNeeded(substrait_func, result_node, actions_dag);
}
};

using FunctionParserTransformKeys = FunctionParserMapTransformImpl<true>;
using FunctionParserTransformValues = FunctionParserMapTransformImpl<false>;

static FunctionParserRegister<FunctionParserTransformKeys> register_transform_keys;
static FunctionParserRegister<FunctionParserTransformValues> register_transform_values;
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("aggregate function - array for non-primitive type")
.exclude("SPARK-14393: values generated by non-deterministic functions shouldn't change after coalesce or union")
.exclude("SPARK-24734: Fix containsNull of Concat for array type")
.exclude("transform keys function - primitive data types")
.exclude("transform keys function - Invalid lambda functions and exceptions")
.exclude("transform values function - test primitive data types")
.exclude("transform values function - test empty")
enableSuite[GlutenDataFrameHintSuite]
enableSuite[GlutenDataFrameImplicitsSuite]
enableSuite[GlutenDataFrameJoinSuite].exclude(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("aggregate function - array for non-primitive type")
.exclude("SPARK-14393: values generated by non-deterministic functions shouldn't change after coalesce or union")
.exclude("SPARK-24734: Fix containsNull of Concat for array type")
.exclude("transform keys function - primitive data types")
.exclude("transform keys function - Invalid lambda functions and exceptions")
.exclude("transform values function - test primitive data types")
.exclude("transform values function - test empty")
enableSuite[GlutenDataFrameHintSuite]
enableSuite[GlutenDataFrameImplicitsSuite]
enableSuite[GlutenDataFrameJoinSuite].exclude(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("aggregate function - array for non-primitive type")
.exclude("SPARK-14393: values generated by non-deterministic functions shouldn't change after coalesce or union")
.exclude("SPARK-24734: Fix containsNull of Concat for array type")
.exclude("transform keys function - primitive data types")
.exclude("transform keys function - Invalid lambda functions and exceptions")
.exclude("transform values function - test primitive data types")
.exclude("transform values function - test empty")
enableSuite[GlutenDataFrameHintSuite]
enableSuite[GlutenDataFrameImplicitsSuite]
enableSuite[GlutenDataFrameJoinSuite].exclude(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("aggregate function - array for non-primitive type")
.exclude("SPARK-14393: values generated by non-deterministic functions shouldn't change after coalesce or union")
.exclude("SPARK-24734: Fix containsNull of Concat for array type")
.exclude("transform keys function - primitive data types")
.exclude("transform keys function - Invalid lambda functions and exceptions")
.exclude("transform values function - test primitive data types")
.exclude("transform values function - test empty")
enableSuite[GlutenDataFrameHintSuite]
enableSuite[GlutenDataFrameImplicitsSuite]
enableSuite[GlutenDataFrameJoinSuite].exclude(
Expand Down

0 comments on commit d75e90f

Please sign in to comment.