Skip to content

Commit

Permalink
Add sort merge join metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you committed Dec 5, 2023
1 parent 9d07d48 commit 71c1368
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,32 @@ class MetricsApiImpl extends MetricsApi with Logging {
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"),
"prepareTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to prepare left list"),
"processTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to process"),
"joinTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to merge join"),
"totaltimeSortmergejoin" -> SQLMetrics
.createTimingMetric(sparkContext, "totaltime sortmergejoin")
"numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of merge join"),
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations"),
"streamPreProjectionCpuCount" -> SQLMetrics.createMetric(
sparkContext,
"stream preProject cpu wall time count"),
"streamPreProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"totaltime of stream preProjection"),
"bufferPreProjectionCpuCount" -> SQLMetrics.createMetric(
sparkContext,
"preProject cpu wall time count"),
"bufferPreProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"totaltime to build preProjection"),
"postProjectionCpuCount" -> SQLMetrics.createMetric(
sparkContext,
"postProject cpu wall time count"),
"postProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"totaltime of postProjection")
)

override def genSortMergeJoinTransformerMetricsUpdater(
Expand Down Expand Up @@ -476,20 +496,13 @@ class MetricsApiImpl extends MetricsApi with Logging {
"postProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"totaltime of postProjection"),
"postProjectionOutputRows" -> SQLMetrics.createMetric(
sparkContext,
"number of postProjection output rows"),
"postProjectionOutputVectors" -> SQLMetrics.createMetric(
sparkContext,
"number of postProjection output vectors"),
"finalOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of final output rows"),
"finalOutputVectors" -> SQLMetrics.createMetric(
sparkContext,
"number of final output vectors")
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes")
)

override def genHashJoinTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = new HashJoinMetricsUpdaterImpl(metrics)
metrics: Map[String, SQLMetric]): MetricsUpdater = new HashJoinMetricsUpdater(metrics)

override def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = {
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.execution

import io.glutenproject.GlutenConfig

import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.internal.SQLConf

class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper {
override protected val backend: String = "velox"
override protected val resourcePath: String = "/tpch-data-parquet-velox"
override protected val fileFormat: String = "parquet"

override def beforeAll(): Unit = {
super.beforeAll()

spark
.range(100)
.selectExpr("id as c1", "id % 3 as c2")
.write
.format("parquet")
.saveAsTable("metrics_t1")

spark
.range(200)
.selectExpr("id as c1", "id % 3 as c2")
.write
.format("parquet")
.saveAsTable("metrics_t2")
}

override protected def afterAll(): Unit = {
spark.sql("drop table metrics_t1")
spark.sql("drop table metrics_t2")

super.afterAll()
}

test("test sort merge join metrics") {
withSQLConf(
GlutenConfig.COLUMNAR_FPRCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
// without preproject
runQueryAndCompare(
"SELECT * FROM metrics_t1 join metrics_t2 on metrics_t1.c1 = metrics_t2.c1"
) {
df =>
val smj = find(df.queryExecution.executedPlan) {
case _: SortMergeJoinExecTransformer => true
case _ => false
}
assert(smj.isDefined)
val metrics = smj.get.metrics
assert(metrics("numOutputRows").value == 100)
assert(metrics("numOutputVectors").value > 0)
assert(metrics("numOutputBytes").value > 0)
}

// with preproject
runQueryAndCompare(
"SELECT * FROM metrics_t1 join metrics_t2 on metrics_t1.c1 + 1 = metrics_t2.c1 + 1"
) {
df =>
val smj = find(df.queryExecution.executedPlan) {
case _: SortMergeJoinExecTransformer => true
case _ => false
}
assert(smj.isDefined)
val metrics = smj.get.metrics
assert(metrics("numOutputRows").value == 100)
assert(metrics("numOutputVectors").value > 0)
assert(metrics("streamPreProjectionCpuCount").value > 0)
assert(metrics("bufferPreProjectionCpuCount").value > 0)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ case class SortMergeJoinExecTransformer(
@transient override lazy val metrics =
BackendsApiManager.getMetricsApiInstance.genSortMergeJoinTransformerMetrics(sparkContext)

override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genSortMergeJoinTransformerMetricsUpdater(metrics)

val (bufferedKeys, streamedKeys, bufferedPlan, streamedPlan) =
(rightKeys, leftKeys, right, left)

Expand Down Expand Up @@ -170,9 +173,6 @@ case class SortMergeJoinExecTransformer(
getColumnarInputRDDs(streamedPlan) ++ getColumnarInputRDDs(bufferedPlan)
}

override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genSortMergeJoinTransformerMetricsUpdater(metrics)

def genJoinParameters(): Any = {
val (isSMJ, isNullAwareAntiJoin) = (1, 0)
// Start with "JoinParameters:"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,45 @@ import io.glutenproject.substrait.JoinParams

import org.apache.spark.sql.execution.metric.SQLMetric

trait HashJoinMetricsUpdater extends MetricsUpdater {
import java.util

trait JoinMetricsUpdater extends MetricsUpdater {
def updateJoinMetrics(
joinMetrics: java.util.ArrayList[OperatorMetrics],
singleMetrics: SingleMetric,
joinParams: JoinParams): Unit
}

class HashJoinMetricsUpdaterImpl(val metrics: Map[String, SQLMetric])
extends HashJoinMetricsUpdater {
abstract class JoinMetricsUpdaterBase(val metrics: Map[String, SQLMetric])
extends JoinMetricsUpdater {
val postProjectionCpuCount: SQLMetric = metrics("postProjectionCpuCount")
val postProjectionWallNanos: SQLMetric = metrics("postProjectionWallNanos")
val numOutputRows: SQLMetric = metrics("numOutputRows")
val numOutputVectors: SQLMetric = metrics("numOutputVectors")
val numOutputBytes: SQLMetric = metrics("numOutputBytes")

final override def updateJoinMetrics(
joinMetrics: util.ArrayList[OperatorMetrics],
singleMetrics: SingleMetric,
joinParams: JoinParams): Unit = {
assert(joinParams.postProjectionNeeded)
val postProjectMetrics = joinMetrics.remove(0)
postProjectionCpuCount += postProjectMetrics.cpuCount
postProjectionWallNanos += postProjectMetrics.wallNanos
numOutputRows += postProjectMetrics.outputRows
numOutputVectors += postProjectMetrics.outputVectors
numOutputBytes += postProjectMetrics.outputBytes

updateJoinMetricsInternal(joinMetrics, joinParams)
}

protected def updateJoinMetricsInternal(
joinMetrics: util.ArrayList[OperatorMetrics],
joinParams: JoinParams): Unit
}

class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])
extends JoinMetricsUpdaterBase(metrics) {
val hashBuildInputRows: SQLMetric = metrics("hashBuildInputRows")
val hashBuildOutputRows: SQLMetric = metrics("hashBuildOutputRows")
val hashBuildOutputVectors: SQLMetric = metrics("hashBuildOutputVectors")
Expand Down Expand Up @@ -71,28 +101,10 @@ class HashJoinMetricsUpdaterImpl(val metrics: Map[String, SQLMetric])
val buildPreProjectionCpuCount: SQLMetric = metrics("buildPreProjectionCpuCount")
val buildPreProjectionWallNanos: SQLMetric = metrics("buildPreProjectionWallNanos")

val postProjectionCpuCount: SQLMetric = metrics("postProjectionCpuCount")
val postProjectionWallNanos: SQLMetric = metrics("postProjectionWallNanos")
val postProjectionOutputRows: SQLMetric = metrics("postProjectionOutputRows")
val postProjectionOutputVectors: SQLMetric = metrics("postProjectionOutputVectors")

val finalOutputRows: SQLMetric = metrics("finalOutputRows")
val finalOutputVectors: SQLMetric = metrics("finalOutputVectors")

override def updateJoinMetrics(
override protected def updateJoinMetricsInternal(
joinMetrics: java.util.ArrayList[OperatorMetrics],
singleMetrics: SingleMetric,
joinParams: JoinParams): Unit = {
var idx = 0
if (joinParams.postProjectionNeeded) {
val postProjectMetrics = joinMetrics.get(idx)
postProjectionCpuCount += postProjectMetrics.cpuCount
postProjectionWallNanos += postProjectMetrics.wallNanos
postProjectionOutputRows += postProjectMetrics.outputRows
postProjectionOutputVectors += postProjectMetrics.outputVectors
idx += 1
}

// HashProbe
val hashProbeMetrics = joinMetrics.get(idx)
hashProbeInputRows += hashProbeMetrics.inputRows
Expand Down Expand Up @@ -140,3 +152,40 @@ class HashJoinMetricsUpdaterImpl(val metrics: Map[String, SQLMetric])
}
}
}

class SortMergeJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])
extends JoinMetricsUpdaterBase(metrics) {
val cpuCount: SQLMetric = metrics("cpuCount")
val wallNanos: SQLMetric = metrics("wallNanos")
val peakMemoryBytes: SQLMetric = metrics("peakMemoryBytes")
val numMemoryAllocations: SQLMetric = metrics("numMemoryAllocations")

val streamPreProjectionCpuCount: SQLMetric = metrics("streamPreProjectionCpuCount")
val streamPreProjectionWallNanos: SQLMetric = metrics("streamPreProjectionWallNanos")
val bufferPreProjectionCpuCount: SQLMetric = metrics("bufferPreProjectionCpuCount")
val bufferPreProjectionWallNanos: SQLMetric = metrics("bufferPreProjectionWallNanos")

override protected def updateJoinMetricsInternal(
joinMetrics: util.ArrayList[OperatorMetrics],
joinParams: JoinParams): Unit = {
var idx = 0
val smjMetrics = joinMetrics.get(0)
cpuCount += smjMetrics.cpuCount
wallNanos += smjMetrics.wallNanos
peakMemoryBytes += smjMetrics.peakMemoryBytes
numMemoryAllocations += smjMetrics.numMemoryAllocations
idx += 1

if (joinParams.buildPreProjectionNeeded) {
bufferPreProjectionCpuCount += joinMetrics.get(idx).cpuCount
bufferPreProjectionWallNanos += joinMetrics.get(idx).wallNanos
idx += 1
}

if (joinParams.streamPreProjectionNeeded) {
streamPreProjectionCpuCount += joinMetrics.get(idx).cpuCount
streamPreProjectionWallNanos += joinMetrics.get(idx).wallNanos
idx += 1
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ object MetricsUtil extends Logging {
MetricsUpdaterTree(
j.metricsUpdater(),
Seq(treeifyMetricsUpdaters(j.buildPlan), treeifyMetricsUpdaters(j.streamedPlan)))
case smj: SortMergeJoinExecTransformer =>
MetricsUpdaterTree(
smj.metricsUpdater(),
Seq(treeifyMetricsUpdaters(smj.bufferedPlan), treeifyMetricsUpdaters(smj.streamedPlan)))
case t: TransformSupport =>
MetricsUpdaterTree(t.metricsUpdater(), t.children.map(treeifyMetricsUpdaters))
case _ =>
Expand Down Expand Up @@ -196,6 +200,11 @@ object MetricsUtil extends Logging {
operatorMetrics,
metrics.getSingleMetrics,
joinParamsMap.get(operatorIdx))
case smj: SortMergeJoinMetricsUpdater =>
smj.updateJoinMetrics(
operatorMetrics,
metrics.getSingleMetrics,
joinParamsMap.get(operatorIdx))
case hau: HashAggregateMetricsUpdater =>
hau.updateAggregationMetrics(operatorMetrics, aggParamsMap.get(operatorIdx))
case lu: LimitMetricsUpdater =>
Expand Down

This file was deleted.

0 comments on commit 71c1368

Please sign in to comment.