Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Dec 2, 2024
1 parent a9b81f1 commit a036ed3
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,12 @@ object MetricsUtil extends Logging {
MetricsUpdaterTree(
smj.metricsUpdater(),
Seq(treeifyMetricsUpdaters(smj.bufferedPlan), treeifyMetricsUpdaters(smj.streamedPlan)))
case u: UnionExecTransformer =>
// Union has 2 dummy project children generated natively in Velox plan.
MetricsUpdaterTree(
u.metricsUpdater(),
u.children.map(
child =>
MetricsUpdaterTree(
MetricsUpdater.Todo,
child.children.map(treeifyMetricsUpdaters))))
case t: TransformSupport if t.metricsUpdater() == MetricsUpdater.None =>
assert(t.children.size == 1, "MetricsUpdater.None can only be used on unary operator")
treeifyMetricsUpdaters(t.children.head)
case t: TransformSupport =>
MetricsUpdaterTree(t.metricsUpdater(), t.children.map(treeifyMetricsUpdaters))
// Reversed children order to match the traversal code.
MetricsUpdaterTree(t.metricsUpdater(), t.children.reverse.map(treeifyMetricsUpdaters))
case _ =>
MetricsUpdaterTree(MetricsUpdater.Terminate, Seq())
}
Expand Down Expand Up @@ -242,6 +234,12 @@ object MetricsUtil extends Logging {
operatorMetrics,
metrics.getSingleMetrics,
joinParamsMap.get(operatorIdx))
case u: UnionMetricsUpdater =>
// JoinRel outputs two suites of metrics respectively for hash build and hash probe.
// Therefore, fetch one more suite of metrics here.
operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx))
curMetricsIdx -= 1
u.updateUnionMetrics(operatorMetrics)
case hau: HashAggregateMetricsUpdater =>
hau.updateAggregationMetrics(operatorMetrics, aggParamsMap.get(operatorIdx))
case lu: LimitMetricsUpdater =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ import org.apache.spark.sql.execution.metric.SQLMetric

class UnionMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
metrics("numInputRows") += operatorMetrics.inputRows
metrics("inputVectors") += operatorMetrics.inputVectors
metrics("inputBytes") += operatorMetrics.inputBytes
metrics("cpuCount") += operatorMetrics.cpuCount
metrics("wallNanos") += operatorMetrics.wallNanos
}
throw new UnsupportedOperationException()
}

def updateUnionMetrics(unionMetrics: java.util.ArrayList[OperatorMetrics]): Unit = {
// Union was interpreted to LocalExchange + LocalPartition. Use metrics from LocalExchange.
val localExchangeMetrics = unionMetrics.get(0)
metrics("numInputRows") += localExchangeMetrics.inputRows
metrics("inputVectors") += localExchangeMetrics.inputVectors
metrics("inputBytes") += localExchangeMetrics.inputBytes
metrics("cpuCount") += localExchangeMetrics.cpuCount
metrics("wallNanos") += localExchangeMetrics.wallNanos
}
}
42 changes: 37 additions & 5 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,47 @@ void WholeStageResultIterator::getOrderedNodeIds(
const std::shared_ptr<const velox::core::PlanNode>& planNode,
std::vector<velox::core::PlanNodeId>& nodeIds) {
bool isProjectNode = (std::dynamic_pointer_cast<const velox::core::ProjectNode>(planNode) != nullptr);
bool isLocalExchangeNode = (std::dynamic_pointer_cast<const velox::core::LocalPartitionNode>(planNode) != nullptr);
bool isUnionNode = isLocalExchangeNode &&
std::dynamic_pointer_cast<const velox::core::LocalPartitionNode>(planNode)->type() ==
velox::core::LocalPartitionNode::Type::kGather;
const auto& sourceNodes = planNode->sources();
for (const auto& sourceNode : sourceNodes) {
if (isProjectNode) {
GLUTEN_CHECK(sourceNodes.size() == 1, "Illegal state");
const auto sourceNode = sourceNodes.at(0);
// Filter over Project are mapped into FilterProject operator in Velox.
// Metrics are all applied on Project node, and the metrics for Filter node
// do not exist.
if (isProjectNode && std::dynamic_pointer_cast<const velox::core::FilterNode>(sourceNode)) {
if (std::dynamic_pointer_cast<const velox::core::FilterNode>(sourceNode)) {
omittedNodeIds_.insert(sourceNode->id());
}
getOrderedNodeIds(sourceNode, nodeIds);
nodeIds.emplace_back(planNode->id());
return;
}

if (isUnionNode) {
// FIXME: The whole metrics system in gluten-substrait is magic. Passing metrics trees through JNI with a trivial
// array is possible but requires for a solid design. Apparently we haven't had it. All the code requires complete
// rework.
// Union was interpreted as LocalPartition + LocalExchange + 2 fake projects as children in Velox. So we only fetch
// metrics from LocalExchange node.
std::vector<std::shared_ptr<const velox::core::PlanNode>> unionChildren{};
for (const auto& source : planNode->sources()) {
const auto projectedChild = std::dynamic_pointer_cast<const velox::core::ProjectNode>(source);
GLUTEN_CHECK(projectedChild != nullptr, "Illegal state");
const auto projectSources = projectedChild->sources();
GLUTEN_CHECK(projectSources.size() == 1, "Illegal state");
const auto projectSource = projectSources.at(0);
getOrderedNodeIds(projectSource, nodeIds);
}
nodeIds.emplace_back(planNode->id());
return;
}

for (const auto& sourceNode : sourceNodes) {
// Post-order traversal.
getOrderedNodeIds(sourceNode, nodeIds);
}
nodeIds.emplace_back(planNode->id());
}
Expand Down Expand Up @@ -350,9 +382,9 @@ void WholeStageResultIterator::collectMetrics() {
continue;
}

const auto& status = planStats.at(nodeId);
// Add each operator status into metrics.
for (const auto& entry : status.operatorStats) {
const auto& stats = planStats.at(nodeId);
// Add each operator stats into metrics.
for (const auto& entry : stats.operatorStats) {
const auto& second = entry.second;
metrics_->get(Metrics::kInputRows)[metricIndex] = second->inputRows;
metrics_->get(Metrics::kInputVectors)[metricIndex] = second->inputVectors;
Expand Down

0 comments on commit a036ed3

Please sign in to comment.