From 805b8804a53fc66c8b840f7007650828626a7db7 Mon Sep 17 00:00:00 2001 From: Jin Chengcheng <79615558+jinchengchenghh@users.noreply.github.com> Date: Wed, 24 Aug 2022 10:45:17 +0800 Subject: [PATCH] [OPPRO-167] Add the metric velox to arrow time (#332) * add the metric velox to arrow time --- cpp/src/compute/exec_backend.h | 15 +++++++++++++-- cpp/src/jni/jni_wrapper.cc | 11 +++++++---- cpp/src/memory/columnar_batch.h | 7 +++++++ cpp/src/utils/metrics.h | 1 + cpp/velox/compute/VeloxPlanConverter.cc | 18 ++++++------------ cpp/velox/compute/VeloxPlanConverter.h | 19 ++++++++++--------- .../io/glutenproject/vectorized/Metrics.java | 13 ++++++++++++- .../vectorized/OperatorMetrics.java | 1 + .../execution/HashJoinExecTransformer.scala | 10 ++++++---- .../execution/WholeStageTransformerExec.scala | 4 +++- 10 files changed, 66 insertions(+), 33 deletions(-) diff --git a/cpp/src/compute/exec_backend.h b/cpp/src/compute/exec_backend.h index e68007d3d4da..7c2cc27e4902 100644 --- a/cpp/src/compute/exec_backend.h +++ b/cpp/src/compute/exec_backend.h @@ -106,7 +106,9 @@ class ExecBackendBase : public std::enable_shared_from_this { rb, memory_pool); } - virtual std::shared_ptr GetMetrics(void* raw_iter) { + virtual std::shared_ptr GetMetrics( + void* raw_iter, + int64_t exportNanos) { return nullptr; }; @@ -280,11 +282,19 @@ class GlutenResultIterator std::shared_ptr GetMetrics() { if (backend_) { - return backend_->GetMetrics(raw_iter_); + return backend_->GetMetrics(raw_iter_, exportNanos_); } return nullptr; } + void setExportNanos(int64_t exportNanos) { + exportNanos_ = exportNanos; + } + + int64_t getExportNanos() { + return exportNanos_; + } + private: template class Wrapper { @@ -303,6 +313,7 @@ class GlutenResultIterator std::unique_ptr iter_; std::shared_ptr next_; std::shared_ptr backend_; + int64_t exportNanos_; inline void CheckValid() { if (iter_ == nullptr) { diff --git a/cpp/src/jni/jni_wrapper.cc b/cpp/src/jni/jni_wrapper.cc index 4b622b913c6c..7b5faceaaf26 100644 --- a/cpp/src/jni/jni_wrapper.cc +++ b/cpp/src/jni/jni_wrapper.cc @@ -257,7 +257,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { env, metrics_builder_class, "", - "([J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V"); + "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J)V"); serialized_arrow_array_iterator_class = CreateGlobalClassReferenceOrError( env, "Lio/glutenproject/vectorized/ArrowInIterator;"); @@ -414,10 +414,12 @@ Java_io_glutenproject_vectorized_ArrowOutIterator_nativeNext( if (!iter->HasNext()) { return false; } + + auto batch = iter->Next(); + auto array = batch->exportToArrow(); + iter->setExportNanos(batch->getExportNanos()); // todo - ArrowArrayMove( - iter->Next()->exportToArrow().get(), - reinterpret_cast(c_array)); + ArrowArrayMove(array.get(), reinterpret_cast(c_array)); return true; JNI_METHOD_END(false) } @@ -498,6 +500,7 @@ Java_io_glutenproject_vectorized_ArrowOutIterator_nativeFetchMetrics( outputBytes, count, wallNanos, + metrics->veloxToArrow, peakMemoryBytes, numMemoryAllocations, numDynamicFiltersProduced, diff --git a/cpp/src/memory/columnar_batch.h b/cpp/src/memory/columnar_batch.h index 8bfe00275771..5c3fc4c47acb 100644 --- a/cpp/src/memory/columnar_batch.h +++ b/cpp/src/memory/columnar_batch.h @@ -42,9 +42,16 @@ class GlutenColumnarBatch { virtual std::shared_ptr exportToArrow() = 0; + virtual int64_t getExportNanos() const { + return exportNanos_; + }; + private: int32_t numColumns; int32_t numRows; + + protected: + int64_t exportNanos_; }; class GlutenArrowArrayColumnarBatch : public GlutenColumnarBatch { diff --git a/cpp/src/utils/metrics.h b/cpp/src/utils/metrics.h index 1ef24435c463..fea7651d22d3 100644 --- a/cpp/src/utils/metrics.h +++ b/cpp/src/utils/metrics.h @@ -32,6 +32,7 @@ struct Metrics { // CpuWallTiming. long* count; long* wallNanos; + long veloxToArrow; long* peakMemoryBytes; long* numMemoryAllocations; diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index 2d2428aa16d1..99f634db61e6 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -367,18 +367,6 @@ void VeloxPlanConverter::cacheOutputSchema( GLUTEN_ASSIGN_OR_THROW(output_schema_, arrow::ImportSchema(&arrowSchema)); } -void WholeStageResIter::toArrowArray(const RowVectorPtr& rv, ArrowArray& out) { - // Make sure to load lazy vector if not loaded already. - for (auto& child : rv->children()) { - child->loadedVector(); - } - - RowVectorPtr copy = std::dynamic_pointer_cast( - BaseVector::create(rv->type(), rv->size(), getPool())); - copy->copy(rv.get(), 0, 0, rv->size()); - exportToArrow(copy, out, getPool()); -} - arrow::Result> WholeStageResIter::Next() { addSplits_(task_.get()); @@ -651,6 +639,7 @@ std::string GlutenVeloxColumnarBatch::GetType() { } std::shared_ptr GlutenVeloxColumnarBatch::exportToArrow() { + auto startTime = std::chrono::steady_clock::now(); ArrowArray out; // Make sure to load lazy vector if not loaded already. for (auto& child : rowVector_->children()) { @@ -665,6 +654,11 @@ std::shared_ptr GlutenVeloxColumnarBatch::exportToArrow() { facebook::velox::exportToArrow( copy, out, gluten::memory::GetDefaultWrappedVeloxMemoryPool().get()); + auto endTime = std::chrono::steady_clock::now(); + auto duration = + std::chrono::duration_cast(endTime - startTime) + .count(); + exportNanos_ += duration; return std::make_shared(out); } diff --git a/cpp/velox/compute/VeloxPlanConverter.h b/cpp/velox/compute/VeloxPlanConverter.h index 963e0de91323..e908c0d77a7f 100644 --- a/cpp/velox/compute/VeloxPlanConverter.h +++ b/cpp/velox/compute/VeloxPlanConverter.h @@ -89,6 +89,10 @@ class GlutenVeloxColumnarBatch : public gluten::memory::GlutenColumnarBatch { void ReleasePayload() override; std::string GetType() override; + /// This method converts Velox RowVector into Arrow Array based on Velox's + /// Arrow conversion implementation, in which memcopy is not needed for + /// fixed-width data types, but is conducted in String conversion. The output + /// array will be the input of Columnar Shuffle or Velox-to-Row. std::shared_ptr exportToArrow() override; private: @@ -109,8 +113,9 @@ class WholeStageResIter { arrow::Result> Next(); - std::shared_ptr GetMetrics() { + std::shared_ptr GetMetrics(int64_t exportNanos) { collectMetrics(); + metrics_->veloxToArrow = exportNanos; return metrics_; } @@ -126,12 +131,6 @@ class WholeStageResIter { std::shared_ptr planNode_; private: - /// This method converts Velox RowVector into Arrow Array based on Velox's - /// Arrow conversion implementation, in which memcopy is not needed for - /// fixed-width data types, but is conducted in String conversion. The output - /// array will be the input of Columnar Shuffle or Velox-to-Row. - void toArrowArray(const RowVectorPtr& rv, ArrowArray& out); - /// Get all the children plan node ids with postorder traversal. void getOrderedNodeIds( const std::shared_ptr& planNode, @@ -148,6 +147,7 @@ class WholeStageResIter { std::shared_ptr pool_; std::shared_ptr metrics_ = nullptr; + int64_t metricVeloxToArrowNanos_ = 0; /// All the children plan node ids with postorder traversal. std::vector orderedNodeIds_; @@ -207,9 +207,10 @@ class VeloxPlanConverter : public gluten::ExecBackendBase { std::vector& scanIds, std::vector& streamIds); - std::shared_ptr GetMetrics(void* raw_iter) override { + std::shared_ptr GetMetrics(void* raw_iter, int64_t exportNanos) + override { auto iter = static_cast(raw_iter); - return iter->GetMetrics(); + return iter->GetMetrics(exportNanos); } std::shared_ptr GetOutputSchema() override; diff --git a/jvm/src/main/java/io/glutenproject/vectorized/Metrics.java b/jvm/src/main/java/io/glutenproject/vectorized/Metrics.java index 571b6a155174..584f16ae2059 100644 --- a/jvm/src/main/java/io/glutenproject/vectorized/Metrics.java +++ b/jvm/src/main/java/io/glutenproject/vectorized/Metrics.java @@ -33,6 +33,7 @@ public class Metrics { public long[] numDynamicFiltersProduced; public long[] numDynamicFiltersAccepted; public long[] numReplacedWithDynamicFilterRows; + public SingleMetric singleMetric = new SingleMetric(); /** * Create an instance for native metrics. @@ -40,7 +41,8 @@ public class Metrics { public Metrics( long[] inputRows, long[] inputVectors, long[] inputBytes, long[] rawInputRows, long[] rawInputBytes, long[] outputRows, long[] outputVectors, long[] outputBytes, - long[] count, long[] wallNanos, long[] peakMemoryBytes, long[] numMemoryAllocations, + long[] count, long[] wallNanos, long veloxToArrow, long[] peakMemoryBytes, + long[] numMemoryAllocations, long[] numDynamicFiltersProduced, long[] numDynamicFiltersAccepted, long[] numReplacedWithDynamicFilterRows) { this.inputRows = inputRows; @@ -53,6 +55,7 @@ public Metrics( this.outputBytes = outputBytes; this.count = count; this.wallNanos = wallNanos; + this.singleMetric.veloxToArrow = veloxToArrow; this.peakMemoryBytes = peakMemoryBytes; this.numMemoryAllocations = numMemoryAllocations; this.numDynamicFiltersProduced = numDynamicFiltersProduced; @@ -82,4 +85,12 @@ public OperatorMetrics getOperatorMetrics(int index) { numDynamicFiltersAccepted[index], numReplacedWithDynamicFilterRows[index]); } + + public SingleMetric getSingleMetrics() { + return singleMetric; + } + + public static class SingleMetric { + public long veloxToArrow; + } } diff --git a/jvm/src/main/java/io/glutenproject/vectorized/OperatorMetrics.java b/jvm/src/main/java/io/glutenproject/vectorized/OperatorMetrics.java index 7bd55e1229a5..6038c2ba8106 100644 --- a/jvm/src/main/java/io/glutenproject/vectorized/OperatorMetrics.java +++ b/jvm/src/main/java/io/glutenproject/vectorized/OperatorMetrics.java @@ -28,6 +28,7 @@ public class OperatorMetrics { public long outputBytes; public long count; public long wallNanos; + public long peakMemoryBytes; public long numMemoryAllocations; public long numDynamicFiltersProduced; diff --git a/jvm/src/main/scala/io/glutenproject/execution/HashJoinExecTransformer.scala b/jvm/src/main/scala/io/glutenproject/execution/HashJoinExecTransformer.scala index a581c19d3d2b..4e91712e7888 100644 --- a/jvm/src/main/scala/io/glutenproject/execution/HashJoinExecTransformer.scala +++ b/jvm/src/main/scala/io/glutenproject/execution/HashJoinExecTransformer.scala @@ -18,13 +18,10 @@ package io.glutenproject.execution import java.{lang, util} - import scala.collection.JavaConverters._ import scala.util.control.Breaks.{break, breakable} - import com.google.common.collect.Lists import com.google.protobuf.{Any, ByteString} - import io.glutenproject.GlutenConfig import io.glutenproject.execution.HashJoinLikeExecTransformer.{makeAndExpression, makeEqualToExpression} import io.glutenproject.expression._ @@ -34,9 +31,9 @@ import io.glutenproject.substrait.expression.{ExpressionBuilder, ExpressionNode} import io.glutenproject.substrait.extensions.{AdvancedExtensionNode, ExtensionBuilder} import io.glutenproject.substrait.plan.PlanBuilder import io.glutenproject.substrait.rel.{RelBuilder, RelNode} +import io.glutenproject.vectorized.Metrics.SingleMetric import io.glutenproject.vectorized.{ExpressionEvaluator, OperatorMetrics} import io.substrait.proto.JoinRel - import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -84,6 +81,8 @@ abstract class HashJoinLikeExecTransformer(leftKeys: Seq[Expression], sparkContext, "stream side cpu wall time count"), "streamWallNanos" -> SQLMetrics.createNanoTimingMetric( sparkContext, "totaltime_stream_input"), + "streamVeloxToArrow" -> SQLMetrics.createNanoTimingMetric( + sparkContext, "totaltime_velox_to_arrow_converter"), "streamPeakMemoryBytes" -> SQLMetrics.createSizeMetric( sparkContext, "stream side peak memory bytes"), "streamNumMemoryAllocations" -> SQLMetrics.createMetric( @@ -258,6 +257,7 @@ abstract class HashJoinLikeExecTransformer(leftKeys: Seq[Expression], val streamOutputBytes: SQLMetric = longMetric("streamOutputBytes") val streamCount: SQLMetric = longMetric("streamCount") val streamWallNanos: SQLMetric = longMetric("streamWallNanos") + val streamVeloxToArrow: SQLMetric = longMetric("streamVeloxToArrow") val streamPeakMemoryBytes: SQLMetric = longMetric("streamPeakMemoryBytes") val streamNumMemoryAllocations: SQLMetric = longMetric("streamNumMemoryAllocations") @@ -401,6 +401,7 @@ abstract class HashJoinLikeExecTransformer(leftKeys: Seq[Expression], } def updateJoinMetrics(joinMetrics: java.util.ArrayList[OperatorMetrics], + singleMetrics: SingleMetric, joinParams: JoinParams): Unit = { var idx = 0 if (joinParams.postProjectionNeeded) { @@ -516,6 +517,7 @@ abstract class HashJoinLikeExecTransformer(leftKeys: Seq[Expression], streamOutputBytes += metrics.outputBytes streamCount += metrics.count streamWallNanos += metrics.wallNanos + streamVeloxToArrow += singleMetrics.veloxToArrow streamPeakMemoryBytes += metrics.peakMemoryBytes streamNumMemoryAllocations += metrics.numMemoryAllocations idx += 1 diff --git a/jvm/src/main/scala/io/glutenproject/execution/WholeStageTransformerExec.scala b/jvm/src/main/scala/io/glutenproject/execution/WholeStageTransformerExec.scala index a6b3bdb69808..378aad3594fd 100644 --- a/jvm/src/main/scala/io/glutenproject/execution/WholeStageTransformerExec.scala +++ b/jvm/src/main/scala/io/glutenproject/execution/WholeStageTransformerExec.scala @@ -26,6 +26,7 @@ import io.glutenproject.expression._ import io.glutenproject.substrait.{AggregationParams, JoinParams, SubstraitContext} import io.glutenproject.substrait.plan.{PlanBuilder, PlanNode} import io.glutenproject.substrait.rel.RelNode +import io.glutenproject.vectorized.Metrics.SingleMetric import io.glutenproject.vectorized._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -447,7 +448,8 @@ case class WholeStageTransformerExec(child: SparkPlan)(val transformStageId: Int // Therefore, fetch one more suite of metrics here. operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx)) curMetricsIdx -= 1 - joinTransformer.updateJoinMetrics(operatorMetrics, joinParamsMap.get(operatorIdx)) + joinTransformer.updateJoinMetrics(operatorMetrics, metrics.getSingleMetrics, + joinParamsMap.get(operatorIdx)) var newOperatorIdx: java.lang.Long = operatorIdx - 1 var newMetricsIdx: Int = curMetricsIdx