From b9cce6dea9755781135bce7be2d8deef767f3fc8 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Mon, 9 Dec 2024 15:25:02 -0800 Subject: [PATCH] feat: Allow non standard partition functions in ScaleWriterPartitioningLocalPartition (#11762) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11762 For example Hive connector partitioning function Reviewed By: xiaoxmeng, Yuhta, tanjialiang Differential Revision: D66833831 fbshipit-source-id: 9ee130ec5e90d2d31973ef14344cd769da63d06a --- velox/exec/LocalPlanner.cpp | 8 +++----- velox/exec/ScaleWriterLocalPartition.cpp | 4 ---- velox/exec/tests/PlanNodeSerdeTest.cpp | 1 + velox/exec/tests/utils/PlanBuilder.cpp | 18 +++++++++++++----- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/velox/exec/LocalPlanner.cpp b/velox/exec/LocalPlanner.cpp index 3645e737b83e..811231881923 100644 --- a/velox/exec/LocalPlanner.cpp +++ b/velox/exec/LocalPlanner.cpp @@ -76,15 +76,13 @@ std::unique_ptr createScaleWriterLocalPartition( const std::shared_ptr& localPartitionNode, int32_t operatorId, DriverCtx* ctx) { - if (dynamic_cast( + if (dynamic_cast( &localPartitionNode->partitionFunctionSpec())) { - return std::make_unique( + return std::make_unique( operatorId, ctx, localPartitionNode); } - VELOX_CHECK_NOT_NULL(dynamic_cast( - &localPartitionNode->partitionFunctionSpec())); - return std::make_unique( + return std::make_unique( operatorId, ctx, localPartitionNode); } diff --git a/velox/exec/ScaleWriterLocalPartition.cpp b/velox/exec/ScaleWriterLocalPartition.cpp index c4995d097528..b243e2b808bb 100644 --- a/velox/exec/ScaleWriterLocalPartition.cpp +++ b/velox/exec/ScaleWriterLocalPartition.cpp @@ -57,10 +57,6 @@ ScaleWriterPartitioningLocalPartition::ScaleWriterPartitioningLocalPartition( : planNode->partitionFunctionSpec().create( numTablePartitions_, /*localExchange=*/true); - if (partitionFunction_ != nullptr) { - VELOX_CHECK_NOT_NULL( - dynamic_cast(partitionFunction_.get())); - } } void ScaleWriterPartitioningLocalPartition::initialize() { diff --git a/velox/exec/tests/PlanNodeSerdeTest.cpp b/velox/exec/tests/PlanNodeSerdeTest.cpp index f815cb0360d5..c52d9e718d58 100644 --- a/velox/exec/tests/PlanNodeSerdeTest.cpp +++ b/velox/exec/tests/PlanNodeSerdeTest.cpp @@ -43,6 +43,7 @@ class PlanNodeSerdeTest : public testing::Test, connector::hive::LocationHandle::registerSerDe(); connector::hive::HiveColumnHandle::registerSerDe(); connector::hive::HiveInsertTableHandle::registerSerDe(); + connector::hive::registerHivePartitionFunctionSerDe(); core::PlanNode::registerSerDe(); core::ITypedExpr::registerSerDe(); registerPartitionFunctionSerDe(); diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index e2eeceda311a..08a3698913a2 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -1293,12 +1293,20 @@ PlanBuilder& PlanBuilder::localPartition(const std::vector& keys) { PlanBuilder& PlanBuilder::scaleWriterlocalPartition( const std::vector& keys) { - planNode_ = createLocalPartitionNode( + std::vector keyIndices; + keyIndices.reserve(keys.size()); + for (const auto& key : keys) { + keyIndices.push_back(planNode_->outputType()->getChildIdx(key)); + } + auto hivePartitionFunctionFactory = + std::make_shared( + 1009, keyIndices, std::vector{}); + planNode_ = std::make_shared( nextPlanNodeId(), - exprs(keys, planNode_->outputType()), - /*scaleWriter=*/true, - {planNode_}, - pool_); + core::LocalPartitionNode::Type::kRepartition, + true, + hivePartitionFunctionFactory, + std::vector{planNode_}); return *this; }