Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Add write IO metrics for WriteFiles #7011

Merged
merged 1 commit into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"physicalWrittenBytes" -> SQLMetrics.createSizeMetric(
sparkContext,
"number of written bytes"),
"writeIONanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of write IO"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of write"),
"numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
assert(write.isDefined)
val metrics = write.get.metrics
assert(metrics("physicalWrittenBytes").value > 0)
assert(metrics("writeIONanos").value > 0)
assert(metrics("numWrittenFiles").value == 1)
}
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
metricsBuilderClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/metrics/Metrics;");

metricsBuilderConstructor = getMethodIdOrError(
env, metricsBuilderClass, "<init>", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");
env, metricsBuilderClass, "<init>", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");

nativeColumnarToRowInfoClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
Expand Down Expand Up @@ -496,6 +496,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIter
longArray[Metrics::kIoWaitTime],
longArray[Metrics::kPreloadSplits],
longArray[Metrics::kPhysicalWrittenBytes],
longArray[Metrics::kWriteIOTime],
longArray[Metrics::kNumWrittenFiles]);

JNI_METHOD_END(nullptr)
Expand Down
1 change: 1 addition & 0 deletions cpp/core/utils/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ struct Metrics {

// Write metrics.
kPhysicalWrittenBytes,
kWriteIOTime,
kNumWrittenFiles,

// The end of enum items.
Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const std::string kRemainingFilterTime = "totalRemainingFilterTime";
const std::string kIoWaitTime = "ioWaitNanos";
const std::string kPreloadSplits = "readyPreloadedSplits";
const std::string kNumWrittenFiles = "numWrittenFiles";
const std::string kWriteIOTime = "writeIOTime";

// others
const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__";
Expand Down Expand Up @@ -391,6 +392,7 @@ void WholeStageResultIterator::collectMetrics() {
metrics_->get(Metrics::kNumWrittenFiles)[metricIndex] =
runtimeMetric("sum", entry.second->customStats, kNumWrittenFiles);
metrics_->get(Metrics::kPhysicalWrittenBytes)[metricIndex] = second->physicalWrittenBytes;
metrics_->get(Metrics::kWriteIOTime)[metricIndex] = runtimeMetric("sum", second->customStats, kWriteIOTime);

metricIndex += 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class Metrics implements IMetrics {
public long[] preloadSplits;

public long[] physicalWrittenBytes;

public long[] writeIOTime;
public long[] numWrittenFiles;

public SingleMetric singleMetric = new SingleMetric();
Expand Down Expand Up @@ -88,6 +88,7 @@ public Metrics(
long[] ioWaitTime,
long[] preloadSplits,
long[] physicalWrittenBytes,
long[] writeIOTime,
long[] numWrittenFiles) {
this.inputRows = inputRows;
this.inputVectors = inputVectors;
Expand Down Expand Up @@ -120,6 +121,7 @@ public Metrics(
this.ioWaitTime = ioWaitTime;
this.preloadSplits = preloadSplits;
this.physicalWrittenBytes = physicalWrittenBytes;
this.writeIOTime = writeIOTime;
this.numWrittenFiles = numWrittenFiles;
}

Expand Down Expand Up @@ -159,6 +161,7 @@ public OperatorMetrics getOperatorMetrics(int index) {
ioWaitTime[index],
preloadSplits[index],
physicalWrittenBytes[index],
writeIOTime[index],
numWrittenFiles[index]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class OperatorMetrics implements IOperatorMetrics {
public long preloadSplits;

public long physicalWrittenBytes;

public long writeIOTime;
public long numWrittenFiles;

/** Create an instance for operator metrics. */
Expand Down Expand Up @@ -83,6 +83,7 @@ public OperatorMetrics(
long ioWaitTime,
long preloadSplits,
long physicalWrittenBytes,
long writeIOTime,
long numWrittenFiles) {
this.inputRows = inputRows;
this.inputVectors = inputVectors;
Expand Down Expand Up @@ -114,6 +115,7 @@ public OperatorMetrics(
this.ioWaitTime = ioWaitTime;
this.preloadSplits = preloadSplits;
this.physicalWrittenBytes = physicalWrittenBytes;
this.writeIOTime = writeIOTime;
this.numWrittenFiles = numWrittenFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ object MetricsUtil extends Logging {
val outputBytes = operatorMetrics.get(0).outputBytes

val physicalWrittenBytes = operatorMetrics.get(0).physicalWrittenBytes
val writeIOTime = operatorMetrics.get(0).writeIOTime

var cpuCount: Long = 0
var wallNanos: Long = 0
Expand Down Expand Up @@ -182,6 +183,7 @@ object MetricsUtil extends Logging {
ioWaitTime,
preloadSplits,
physicalWrittenBytes,
writeIOTime,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps you missed the merge code?

numWrittenFiles
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class WriteFilesMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metr
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
metrics("physicalWrittenBytes") += operatorMetrics.physicalWrittenBytes
metrics("writeIONanos") += operatorMetrics.writeIOTime
metrics("wallNanos") += operatorMetrics.wallNanos
metrics("numWrittenFiles") += operatorMetrics.numWrittenFiles
}
Expand Down
Loading