Skip to content

Commit

Permalink
[GLUTEN-7745][VL] Incorporate SQL Union operator into Velox execution…
Browse files Browse the repository at this point in the history
… pipeline (#7842)
  • Loading branch information
zhztheplayer authored Dec 3, 2024
1 parent 9059ff6 commit 6dd91ba
Show file tree
Hide file tree
Showing 23 changed files with 604 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,14 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
s"SampleTransformer metrics update is not supported in CH backend")
}

override def genUnionTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
throw new UnsupportedOperationException(
"UnionExecTransformer metrics update is not supported in CH backend")

override def genUnionTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
throw new UnsupportedOperationException(
"UnionExecTransformer metrics update is not supported in CH backend")

def genWriteFilesTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"physicalWrittenBytes" -> SQLMetrics.createMetric(sparkContext, "number of written bytes"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class GlutenURLDecoder {
* <p><em><strong>Note:</strong> The <a href=
* "http://www.w3.org/TR/html40/appendix/notes.html#non-ascii-chars"> World Wide Web Consortium
* Recommendation</a> states that UTF-8 should be used. Not doing so may introduce
* incompatibilites.</em>
* incompatibilities.</em>
*
* @param s the <code>String</code> to decode
* @param enc The name of a supported <a href="../lang/package-summary.html#charenc">character
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.{HdfsConfGenerator, SparkConf, SparkContext}
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.execution.ColumnarCachedBatchSerializer
import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules
import org.apache.spark.sql.execution.datasources.velox.{VeloxParquetWriterInjects, VeloxRowSplitter}
import org.apache.spark.sql.expression.UDFResolver
Expand Down Expand Up @@ -75,7 +76,7 @@ class VeloxListenerApi extends ListenerApi with Logging {
if (conf.getBoolean(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, defaultValue = false)) {
conf.set(
StaticSQLConf.SPARK_CACHE_SERIALIZER.key,
"org.apache.spark.sql.execution.ColumnarCachedBatchSerializer")
classOf[ColumnarCachedBatchSerializer].getName)
}

// Static initializers for driver.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,4 +582,15 @@ class VeloxMetricsApi extends MetricsApi with Logging {

override def genSampleTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
new SampleMetricsUpdater(metrics)

override def genUnionTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map(
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of union"),
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count")
)

override def genUnionTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
new UnionMetricsUpdater(metrics)
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ object VeloxRuleApi {
c => HeuristicTransform.Single(validatorBuilder(c.glutenConf), rewrites, offloads))

// Legacy: Post-transform rules.
injector.injectPostTransform(_ => UnionTransformerRule())
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectPostTransform(c => RewriteTransformer.apply(c.session))
Expand Down Expand Up @@ -178,6 +179,7 @@ object VeloxRuleApi {

// Gluten RAS: Post rules.
injector.injectPostTransform(_ => RemoveTransitions)
injector.injectPostTransform(_ => UnionTransformerRule())
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectPostTransform(c => RewriteTransformer.apply(c.session))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ object MetricsUtil extends Logging {
assert(t.children.size == 1, "MetricsUpdater.None can only be used on unary operator")
treeifyMetricsUpdaters(t.children.head)
case t: TransformSupport =>
MetricsUpdaterTree(t.metricsUpdater(), t.children.map(treeifyMetricsUpdaters))
// Reversed children order to match the traversal code.
MetricsUpdaterTree(t.metricsUpdater(), t.children.reverse.map(treeifyMetricsUpdaters))
case _ =>
MetricsUpdaterTree(MetricsUpdater.Terminate, Seq())
}
Expand Down Expand Up @@ -233,6 +234,12 @@ object MetricsUtil extends Logging {
operatorMetrics,
metrics.getSingleMetrics,
joinParamsMap.get(operatorIdx))
case u: UnionMetricsUpdater =>
// JoinRel outputs two suites of metrics respectively for hash build and hash probe.
// Therefore, fetch one more suite of metrics here.
operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx))
curMetricsIdx -= 1
u.updateUnionMetrics(operatorMetrics)
case hau: HashAggregateMetricsUpdater =>
hau.updateAggregationMetrics(operatorMetrics, aggParamsMap.get(operatorIdx))
case lu: LimitMetricsUpdater =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.metrics

import org.apache.spark.sql.execution.metric.SQLMetric

class UnionMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
throw new UnsupportedOperationException()
}

def updateUnionMetrics(unionMetrics: java.util.ArrayList[OperatorMetrics]): Unit = {
// Union was interpreted to LocalExchange + LocalPartition. Use metrics from LocalExchange.
val localExchangeMetrics = unionMetrics.get(0)
metrics("numInputRows") += localExchangeMetrics.inputRows
metrics("inputVectors") += localExchangeMetrics.inputVectors
metrics("inputBytes") += localExchangeMetrics.inputBytes
metrics("cpuCount") += localExchangeMetrics.cpuCount
metrics("wallNanos") += localExchangeMetrics.wallNanos
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,37 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
|""".stripMargin) {
df =>
{
getExecutedPlan(df).exists(plan => plan.find(_.isInstanceOf[ColumnarUnionExec]).isDefined)
assert(
getExecutedPlan(df).exists(
plan => plan.find(_.isInstanceOf[ColumnarUnionExec]).isDefined))
}
}
}

test("union_all two tables with known partitioning") {
withSQLConf(GlutenConfig.NATIVE_UNION_ENABLED.key -> "true") {
compareDfResultsAgainstVanillaSpark(
() => {
val df1 = spark.sql("select l_orderkey as orderkey from lineitem")
val df2 = spark.sql("select o_orderkey as orderkey from orders")
df1.repartition(5).union(df2.repartition(5))
},
compareResult = true,
checkGlutenOperatorMatch[UnionExecTransformer]
)

compareDfResultsAgainstVanillaSpark(
() => {
val df1 = spark.sql("select l_orderkey as orderkey from lineitem")
val df2 = spark.sql("select o_orderkey as orderkey from orders")
df1.repartition(5).union(df2.repartition(6))
},
compareResult = true,
checkGlutenOperatorMatch[ColumnarUnionExec]
)
}
}

test("union_all three tables") {
runQueryAndCompare("""
|select count(orderkey) from (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,10 @@ class VeloxOrcDataTypeValidationSuite extends VeloxWholeStageTransformerSuite {
|""".stripMargin) {
df =>
{
assert(getExecutedPlan(df).exists(plan => plan.isInstanceOf[ColumnarUnionExec]))
assert(
getExecutedPlan(df).exists(
plan =>
plan.isInstanceOf[ColumnarUnionExec] || plan.isInstanceOf[UnionExecTransformer]))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ class VeloxParquetDataTypeValidationSuite extends VeloxWholeStageTransformerSuit
|""".stripMargin) {
df =>
{
assert(getExecutedPlan(df).exists(plan => plan.isInstanceOf[ColumnarUnionExec]))
assert(
getExecutedPlan(df).exists(
plan =>
plan.isInstanceOf[ColumnarUnionExec] || plan.isInstanceOf[UnionExecTransformer]))
}
}

Expand Down
44 changes: 38 additions & 6 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ WholeStageResultIterator::WholeStageResultIterator(
std::move(queryCtx),
velox::exec::Task::ExecutionMode::kSerial);
if (!task_->supportSerialExecutionMode()) {
throw std::runtime_error("Task doesn't support single thread execution: " + planNode->toString());
throw std::runtime_error("Task doesn't support single threaded execution: " + planNode->toString());
}
auto fileSystem = velox::filesystems::getFileSystem(spillDir, nullptr);
GLUTEN_CHECK(fileSystem != nullptr, "File System for spilling is null!");
Expand Down Expand Up @@ -248,15 +248,47 @@ void WholeStageResultIterator::getOrderedNodeIds(
const std::shared_ptr<const velox::core::PlanNode>& planNode,
std::vector<velox::core::PlanNodeId>& nodeIds) {
bool isProjectNode = (std::dynamic_pointer_cast<const velox::core::ProjectNode>(planNode) != nullptr);
bool isLocalExchangeNode = (std::dynamic_pointer_cast<const velox::core::LocalPartitionNode>(planNode) != nullptr);
bool isUnionNode = isLocalExchangeNode &&
std::dynamic_pointer_cast<const velox::core::LocalPartitionNode>(planNode)->type() ==
velox::core::LocalPartitionNode::Type::kGather;
const auto& sourceNodes = planNode->sources();
for (const auto& sourceNode : sourceNodes) {
if (isProjectNode) {
GLUTEN_CHECK(sourceNodes.size() == 1, "Illegal state");
const auto sourceNode = sourceNodes.at(0);
// Filter over Project are mapped into FilterProject operator in Velox.
// Metrics are all applied on Project node, and the metrics for Filter node
// do not exist.
if (isProjectNode && std::dynamic_pointer_cast<const velox::core::FilterNode>(sourceNode)) {
if (std::dynamic_pointer_cast<const velox::core::FilterNode>(sourceNode)) {
omittedNodeIds_.insert(sourceNode->id());
}
getOrderedNodeIds(sourceNode, nodeIds);
nodeIds.emplace_back(planNode->id());
return;
}

if (isUnionNode) {
// FIXME: The whole metrics system in gluten-substrait is magic. Passing metrics trees through JNI with a trivial
// array is possible but requires for a solid design. Apparently we haven't had it. All the code requires complete
// rework.
// Union was interpreted as LocalPartition + LocalExchange + 2 fake projects as children in Velox. So we only fetch
// metrics from the root node.
std::vector<std::shared_ptr<const velox::core::PlanNode>> unionChildren{};
for (const auto& source : planNode->sources()) {
const auto projectedChild = std::dynamic_pointer_cast<const velox::core::ProjectNode>(source);
GLUTEN_CHECK(projectedChild != nullptr, "Illegal state");
const auto projectSources = projectedChild->sources();
GLUTEN_CHECK(projectSources.size() == 1, "Illegal state");
const auto projectSource = projectSources.at(0);
getOrderedNodeIds(projectSource, nodeIds);
}
nodeIds.emplace_back(planNode->id());
return;
}

for (const auto& sourceNode : sourceNodes) {
// Post-order traversal.
getOrderedNodeIds(sourceNode, nodeIds);
}
nodeIds.emplace_back(planNode->id());
}
Expand Down Expand Up @@ -350,9 +382,9 @@ void WholeStageResultIterator::collectMetrics() {
continue;
}

const auto& status = planStats.at(nodeId);
// Add each operator status into metrics.
for (const auto& entry : status.operatorStats) {
const auto& stats = planStats.at(nodeId);
// Add each operator stats into metrics.
for (const auto& entry : stats.operatorStats) {
const auto& second = entry.second;
metrics_->get(Metrics::kInputRows)[metricIndex] = second->inputRows;
metrics_->get(Metrics::kInputVectors)[metricIndex] = second->inputVectors;
Expand Down
46 changes: 46 additions & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,50 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(
childNode);
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::SetRel& setRel) {
switch (setRel.op()) {
case ::substrait::SetRel_SetOp::SetRel_SetOp_SET_OP_UNION_ALL: {
std::vector<core::PlanNodePtr> children;
for (int32_t i = 0; i < setRel.inputs_size(); ++i) {
const auto& input = setRel.inputs(i);
children.push_back(toVeloxPlan(input));
}
GLUTEN_CHECK(!children.empty(), "At least one source is required for Velox LocalPartition");

// Velox doesn't allow different field names in schemas of LocalPartitionNode's children.
// Add project nodes to unify the schemas.
const RowTypePtr outRowType = asRowType(children[0]->outputType());
std::vector<std::string> outNames;
for (int32_t colIdx = 0; colIdx < outRowType->size(); ++colIdx) {
const auto name = outRowType->childAt(colIdx)->name();
outNames.push_back(name);
}

std::vector<core::PlanNodePtr> projectedChildren;
for (int32_t i = 0; i < children.size(); ++i) {
const auto& child = children[i];
const RowTypePtr& childRowType = child->outputType();
std::vector<core::TypedExprPtr> expressions;
for (int32_t colIdx = 0; colIdx < outNames.size(); ++colIdx) {
const auto fa =
std::make_shared<core::FieldAccessTypedExpr>(childRowType->childAt(colIdx), childRowType->nameOf(colIdx));
const auto cast = std::make_shared<core::CastTypedExpr>(outRowType->childAt(colIdx), fa, false);
expressions.push_back(cast);
}
auto project = std::make_shared<core::ProjectNode>(nextPlanNodeId(), outNames, expressions, child);
projectedChildren.push_back(project);
}
return std::make_shared<core::LocalPartitionNode>(
nextPlanNodeId(),
core::LocalPartitionNode::Type::kGather,
std::make_shared<core::GatherPartitionFunctionSpec>(),
projectedChildren);
}
default:
throw GlutenException("Unsupported SetRel op: " + std::to_string(setRel.op()));
}
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::SortRel& sortRel) {
auto childNode = convertSingleInput<::substrait::SortRel>(sortRel);
auto [sortingKeys, sortingOrders] = processSortField(sortRel.sorts(), childNode->outputType());
Expand Down Expand Up @@ -1298,6 +1342,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
return toVeloxPlan(rel.write());
} else if (rel.has_windowgrouplimit()) {
return toVeloxPlan(rel.windowgrouplimit());
} else if (rel.has_set()) {
return toVeloxPlan(rel.set());
} else {
VELOX_NYI("Substrait conversion not supported for Rel.");
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ class SubstraitToVeloxPlanConverter {
/// Used to convert Substrait WindowGroupLimitRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::WindowGroupLimitRel& windowGroupLimitRel);

/// Used to convert Substrait SetRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::SetRel& setRel);

/// Used to convert Substrait JoinRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::JoinRel& joinRel);

Expand Down
Loading

0 comments on commit 6dd91ba

Please sign in to comment.