Skip to content

Commit

Permalink
[GLUTEN-7008][VL] Report spill metrics from Velox operators to Spark …
Browse files Browse the repository at this point in the history
…task (apache#7009)

Closes apache#7008
  • Loading branch information
zhztheplayer authored and shamirchen committed Oct 14, 2024
1 parent a4eadeb commit 13952e3
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 1 deletion.
3 changes: 2 additions & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
metricsBuilderClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/metrics/Metrics;");

metricsBuilderConstructor = getMethodIdOrError(
env, metricsBuilderClass, "<init>", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");
env, metricsBuilderClass, "<init>", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");

nativeColumnarToRowInfoClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
Expand Down Expand Up @@ -486,6 +486,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIter
metrics ? metrics->veloxToArrow : -1,
longArray[Metrics::kPeakMemoryBytes],
longArray[Metrics::kNumMemoryAllocations],
longArray[Metrics::kSpilledInputBytes],
longArray[Metrics::kSpilledBytes],
longArray[Metrics::kSpilledRows],
longArray[Metrics::kSpilledPartitions],
Expand Down
1 change: 1 addition & 0 deletions cpp/core/utils/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct Metrics {
kNumMemoryAllocations,

// Spill.
kSpilledInputBytes,
kSpilledBytes,
kSpilledRows,
kSpilledPartitions,
Expand Down
1 change: 1 addition & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ void WholeStageResultIterator::collectMetrics() {
metrics_->get(Metrics::kWallNanos)[metricIndex] = second->cpuWallTiming.wallNanos;
metrics_->get(Metrics::kPeakMemoryBytes)[metricIndex] = second->peakMemoryBytes;
metrics_->get(Metrics::kNumMemoryAllocations)[metricIndex] = second->numMemoryAllocations;
metrics_->get(Metrics::kSpilledInputBytes)[metricIndex] = second->spilledInputBytes;
metrics_->get(Metrics::kSpilledBytes)[metricIndex] = second->spilledBytes;
metrics_->get(Metrics::kSpilledRows)[metricIndex] = second->spilledRows;
metrics_->get(Metrics::kSpilledPartitions)[metricIndex] = second->spilledPartitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class Metrics implements IMetrics {
public long[] scanTime;
public long[] peakMemoryBytes;
public long[] numMemoryAllocations;
public long[] spilledInputBytes;
public long[] spilledBytes;
public long[] spilledRows;
public long[] spilledPartitions;
Expand Down Expand Up @@ -69,6 +70,7 @@ public Metrics(
long veloxToArrow,
long[] peakMemoryBytes,
long[] numMemoryAllocations,
long[] spilledInputBytes,
long[] spilledBytes,
long[] spilledRows,
long[] spilledPartitions,
Expand Down Expand Up @@ -101,6 +103,7 @@ public Metrics(
this.singleMetric.veloxToArrow = veloxToArrow;
this.peakMemoryBytes = peakMemoryBytes;
this.numMemoryAllocations = numMemoryAllocations;
this.spilledInputBytes = spilledInputBytes;
this.spilledBytes = spilledBytes;
this.spilledRows = spilledRows;
this.spilledPartitions = spilledPartitions;
Expand Down Expand Up @@ -138,6 +141,7 @@ public OperatorMetrics getOperatorMetrics(int index) {
wallNanos[index],
peakMemoryBytes[index],
numMemoryAllocations[index],
spilledInputBytes[index],
spilledBytes[index],
spilledRows[index],
spilledPartitions[index],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class OperatorMetrics implements IOperatorMetrics {
public long scanTime;
public long peakMemoryBytes;
public long numMemoryAllocations;
public long spilledInputBytes;
public long spilledBytes;
public long spilledRows;
public long spilledPartitions;
Expand Down Expand Up @@ -64,6 +65,7 @@ public OperatorMetrics(
long wallNanos,
long peakMemoryBytes,
long numMemoryAllocations,
long spilledInputBytes,
long spilledBytes,
long spilledRows,
long spilledPartitions,
Expand Down Expand Up @@ -95,6 +97,7 @@ public OperatorMetrics(
this.scanTime = scanTime;
this.peakMemoryBytes = peakMemoryBytes;
this.numMemoryAllocations = numMemoryAllocations;
this.spilledInputBytes = spilledInputBytes;
this.spilledBytes = spilledBytes;
this.spilledRows = spilledRows;
this.spilledPartitions = spilledPartitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.gluten.metrics
import org.apache.gluten.substrait.AggregationParams

import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkMetricsUtil
import org.apache.spark.util.TaskResources

trait HashAggregateMetricsUpdater extends MetricsUpdater {
def updateAggregationMetrics(
Expand Down Expand Up @@ -81,5 +83,13 @@ class HashAggregateMetricsUpdaterImpl(val metrics: Map[String, SQLMetric])
rowConstructionWallNanos += aggregationMetrics.get(idx).wallNanos
idx += 1
}
if (TaskResources.inSparkTask()) {
SparkMetricsUtil.incMemoryBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
aggMetrics.spilledInputBytes)
SparkMetricsUtil.incDiskBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
aggMetrics.spilledBytes)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import org.apache.gluten.metrics.Metrics.SingleMetric
import org.apache.gluten.substrait.JoinParams

import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkMetricsUtil
import org.apache.spark.util.TaskResources

import java.util

Expand Down Expand Up @@ -150,6 +152,20 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])
streamPreProjectionWallNanos += joinMetrics.get(idx).wallNanos
idx += 1
}
if (TaskResources.inSparkTask()) {
SparkMetricsUtil.incMemoryBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
hashProbeMetrics.spilledInputBytes)
SparkMetricsUtil.incDiskBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
hashProbeMetrics.spilledBytes)
SparkMetricsUtil.incMemoryBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
hashBuildMetrics.spilledInputBytes)
SparkMetricsUtil.incDiskBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
hashBuildMetrics.spilledBytes)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ object MetricsUtil extends Logging {
var wallNanos: Long = 0
var peakMemoryBytes: Long = 0
var numMemoryAllocations: Long = 0
var spilledInputBytes: Long = 0
var spilledBytes: Long = 0
var spilledRows: Long = 0
var spilledPartitions: Long = 0
Expand All @@ -130,6 +131,7 @@ object MetricsUtil extends Logging {
wallNanos += metrics.wallNanos
peakMemoryBytes = peakMemoryBytes.max(metrics.peakMemoryBytes)
numMemoryAllocations += metrics.numMemoryAllocations
spilledInputBytes += metrics.spilledInputBytes
spilledBytes += metrics.spilledBytes
spilledRows += metrics.spilledRows
spilledPartitions += metrics.spilledPartitions
Expand Down Expand Up @@ -162,6 +164,7 @@ object MetricsUtil extends Logging {
wallNanos,
peakMemoryBytes,
numMemoryAllocations,
spilledInputBytes,
spilledBytes,
spilledRows,
spilledPartitions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.gluten.metrics

import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkMetricsUtil
import org.apache.spark.util.TaskResources

class SortMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {

Expand All @@ -34,6 +36,14 @@ class SortMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpd
metrics("spilledRows") += operatorMetrics.spilledRows
metrics("spilledPartitions") += operatorMetrics.spilledPartitions
metrics("spilledFiles") += operatorMetrics.spilledFiles
if (TaskResources.inSparkTask()) {
SparkMetricsUtil.incMemoryBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
operatorMetrics.spilledInputBytes)
SparkMetricsUtil.incDiskBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
operatorMetrics.spilledBytes)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.utils

import org.apache.spark.executor.TaskMetrics

object SparkMetricsUtil {
def incMemoryBytesSpilled(task: TaskMetrics, v: Long): Unit = task.incMemoryBytesSpilled(v)
def incDiskBytesSpilled(task: TaskMetrics, v: Long): Unit = task.incDiskBytesSpilled(v)
}

0 comments on commit 13952e3

Please sign in to comment.