Skip to content

Commit

Permalink
[OPPRO-167] Add the metric velox to arrow time (oap-project#332)
Browse files Browse the repository at this point in the history
* add the metric velox to arrow time
  • Loading branch information
jinchengchenghh authored Aug 24, 2022
1 parent 4cdaabd commit 805b880
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 33 deletions.
15 changes: 13 additions & 2 deletions cpp/src/compute/exec_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ class ExecBackendBase : public std::enable_shared_from_this<ExecBackendBase> {
rb, memory_pool);
}

virtual std::shared_ptr<Metrics> GetMetrics(void* raw_iter) {
virtual std::shared_ptr<Metrics> GetMetrics(
void* raw_iter,
int64_t exportNanos) {
return nullptr;
};

Expand Down Expand Up @@ -280,11 +282,19 @@ class GlutenResultIterator

std::shared_ptr<Metrics> GetMetrics() {
if (backend_) {
return backend_->GetMetrics(raw_iter_);
return backend_->GetMetrics(raw_iter_, exportNanos_);
}
return nullptr;
}

void setExportNanos(int64_t exportNanos) {
exportNanos_ = exportNanos;
}

int64_t getExportNanos() {
return exportNanos_;
}

private:
template <typename T>
class Wrapper {
Expand All @@ -303,6 +313,7 @@ class GlutenResultIterator
std::unique_ptr<GlutenIterator> iter_;
std::shared_ptr<memory::GlutenColumnarBatch> next_;
std::shared_ptr<ExecBackendBase> backend_;
int64_t exportNanos_;

inline void CheckValid() {
if (iter_ == nullptr) {
Expand Down
11 changes: 7 additions & 4 deletions cpp/src/jni/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
env,
metrics_builder_class,
"<init>",
"([J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J)V");

serialized_arrow_array_iterator_class = CreateGlobalClassReferenceOrError(
env, "Lio/glutenproject/vectorized/ArrowInIterator;");
Expand Down Expand Up @@ -414,10 +414,12 @@ Java_io_glutenproject_vectorized_ArrowOutIterator_nativeNext(
if (!iter->HasNext()) {
return false;
}

auto batch = iter->Next();
auto array = batch->exportToArrow();
iter->setExportNanos(batch->getExportNanos());
// todo
ArrowArrayMove(
iter->Next()->exportToArrow().get(),
reinterpret_cast<struct ArrowArray*>(c_array));
ArrowArrayMove(array.get(), reinterpret_cast<struct ArrowArray*>(c_array));
return true;
JNI_METHOD_END(false)
}
Expand Down Expand Up @@ -498,6 +500,7 @@ Java_io_glutenproject_vectorized_ArrowOutIterator_nativeFetchMetrics(
outputBytes,
count,
wallNanos,
metrics->veloxToArrow,
peakMemoryBytes,
numMemoryAllocations,
numDynamicFiltersProduced,
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/memory/columnar_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,16 @@ class GlutenColumnarBatch {

virtual std::shared_ptr<ArrowArray> exportToArrow() = 0;

virtual int64_t getExportNanos() const {
return exportNanos_;
};

private:
int32_t numColumns;
int32_t numRows;

protected:
int64_t exportNanos_;
};

class GlutenArrowArrayColumnarBatch : public GlutenColumnarBatch {
Expand Down
1 change: 1 addition & 0 deletions cpp/src/utils/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct Metrics {
// CpuWallTiming.
long* count;
long* wallNanos;
long veloxToArrow;

long* peakMemoryBytes;
long* numMemoryAllocations;
Expand Down
18 changes: 6 additions & 12 deletions cpp/velox/compute/VeloxPlanConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,18 +367,6 @@ void VeloxPlanConverter::cacheOutputSchema(
GLUTEN_ASSIGN_OR_THROW(output_schema_, arrow::ImportSchema(&arrowSchema));
}

void WholeStageResIter::toArrowArray(const RowVectorPtr& rv, ArrowArray& out) {
// Make sure to load lazy vector if not loaded already.
for (auto& child : rv->children()) {
child->loadedVector();
}

RowVectorPtr copy = std::dynamic_pointer_cast<RowVector>(
BaseVector::create(rv->type(), rv->size(), getPool()));
copy->copy(rv.get(), 0, 0, rv->size());
exportToArrow(copy, out, getPool());
}

arrow::Result<std::shared_ptr<GlutenVeloxColumnarBatch>>
WholeStageResIter::Next() {
addSplits_(task_.get());
Expand Down Expand Up @@ -651,6 +639,7 @@ std::string GlutenVeloxColumnarBatch::GetType() {
}

std::shared_ptr<ArrowArray> GlutenVeloxColumnarBatch::exportToArrow() {
auto startTime = std::chrono::steady_clock::now();
ArrowArray out;
// Make sure to load lazy vector if not loaded already.
for (auto& child : rowVector_->children()) {
Expand All @@ -665,6 +654,11 @@ std::shared_ptr<ArrowArray> GlutenVeloxColumnarBatch::exportToArrow() {
facebook::velox::exportToArrow(
copy, out, gluten::memory::GetDefaultWrappedVeloxMemoryPool().get());

auto endTime = std::chrono::steady_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::nanoseconds>(endTime - startTime)
.count();
exportNanos_ += duration;
return std::make_shared<ArrowArray>(out);
}

Expand Down
19 changes: 10 additions & 9 deletions cpp/velox/compute/VeloxPlanConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ class GlutenVeloxColumnarBatch : public gluten::memory::GlutenColumnarBatch {

void ReleasePayload() override;
std::string GetType() override;
/// This method converts Velox RowVector into Arrow Array based on Velox's
/// Arrow conversion implementation, in which memcopy is not needed for
/// fixed-width data types, but is conducted in String conversion. The output
/// array will be the input of Columnar Shuffle or Velox-to-Row.
std::shared_ptr<ArrowArray> exportToArrow() override;

private:
Expand All @@ -109,8 +113,9 @@ class WholeStageResIter {

arrow::Result<std::shared_ptr<GlutenVeloxColumnarBatch>> Next();

std::shared_ptr<Metrics> GetMetrics() {
std::shared_ptr<Metrics> GetMetrics(int64_t exportNanos) {
collectMetrics();
metrics_->veloxToArrow = exportNanos;
return metrics_;
}

Expand All @@ -126,12 +131,6 @@ class WholeStageResIter {
std::shared_ptr<const core::PlanNode> planNode_;

private:
/// This method converts Velox RowVector into Arrow Array based on Velox's
/// Arrow conversion implementation, in which memcopy is not needed for
/// fixed-width data types, but is conducted in String conversion. The output
/// array will be the input of Columnar Shuffle or Velox-to-Row.
void toArrowArray(const RowVectorPtr& rv, ArrowArray& out);

/// Get all the children plan node ids with postorder traversal.
void getOrderedNodeIds(
const std::shared_ptr<const core::PlanNode>& planNode,
Expand All @@ -148,6 +147,7 @@ class WholeStageResIter {
std::shared_ptr<memory::MemoryPool> pool_;

std::shared_ptr<Metrics> metrics_ = nullptr;
int64_t metricVeloxToArrowNanos_ = 0;

/// All the children plan node ids with postorder traversal.
std::vector<core::PlanNodeId> orderedNodeIds_;
Expand Down Expand Up @@ -207,9 +207,10 @@ class VeloxPlanConverter : public gluten::ExecBackendBase {
std::vector<core::PlanNodeId>& scanIds,
std::vector<core::PlanNodeId>& streamIds);

std::shared_ptr<Metrics> GetMetrics(void* raw_iter) override {
std::shared_ptr<Metrics> GetMetrics(void* raw_iter, int64_t exportNanos)
override {
auto iter = static_cast<WholeStageResIter*>(raw_iter);
return iter->GetMetrics();
return iter->GetMetrics(exportNanos);
}

std::shared_ptr<arrow::Schema> GetOutputSchema() override;
Expand Down
13 changes: 12 additions & 1 deletion jvm/src/main/java/io/glutenproject/vectorized/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ public class Metrics {
public long[] numDynamicFiltersProduced;
public long[] numDynamicFiltersAccepted;
public long[] numReplacedWithDynamicFilterRows;
public SingleMetric singleMetric = new SingleMetric();

/**
* Create an instance for native metrics.
*/
public Metrics(
long[] inputRows, long[] inputVectors, long[] inputBytes, long[] rawInputRows,
long[] rawInputBytes, long[] outputRows, long[] outputVectors, long[] outputBytes,
long[] count, long[] wallNanos, long[] peakMemoryBytes, long[] numMemoryAllocations,
long[] count, long[] wallNanos, long veloxToArrow, long[] peakMemoryBytes,
long[] numMemoryAllocations,
long[] numDynamicFiltersProduced, long[] numDynamicFiltersAccepted,
long[] numReplacedWithDynamicFilterRows) {
this.inputRows = inputRows;
Expand All @@ -53,6 +55,7 @@ public Metrics(
this.outputBytes = outputBytes;
this.count = count;
this.wallNanos = wallNanos;
this.singleMetric.veloxToArrow = veloxToArrow;
this.peakMemoryBytes = peakMemoryBytes;
this.numMemoryAllocations = numMemoryAllocations;
this.numDynamicFiltersProduced = numDynamicFiltersProduced;
Expand Down Expand Up @@ -82,4 +85,12 @@ public OperatorMetrics getOperatorMetrics(int index) {
numDynamicFiltersAccepted[index],
numReplacedWithDynamicFilterRows[index]);
}

public SingleMetric getSingleMetrics() {
return singleMetric;
}

public static class SingleMetric {
public long veloxToArrow;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class OperatorMetrics {
public long outputBytes;
public long count;
public long wallNanos;

public long peakMemoryBytes;
public long numMemoryAllocations;
public long numDynamicFiltersProduced;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@
package io.glutenproject.execution

import java.{lang, util}

import scala.collection.JavaConverters._
import scala.util.control.Breaks.{break, breakable}

import com.google.common.collect.Lists
import com.google.protobuf.{Any, ByteString}

import io.glutenproject.GlutenConfig
import io.glutenproject.execution.HashJoinLikeExecTransformer.{makeAndExpression, makeEqualToExpression}
import io.glutenproject.expression._
Expand All @@ -34,9 +31,9 @@ import io.glutenproject.substrait.expression.{ExpressionBuilder, ExpressionNode}
import io.glutenproject.substrait.extensions.{AdvancedExtensionNode, ExtensionBuilder}
import io.glutenproject.substrait.plan.PlanBuilder
import io.glutenproject.substrait.rel.{RelBuilder, RelNode}
import io.glutenproject.vectorized.Metrics.SingleMetric
import io.glutenproject.vectorized.{ExpressionEvaluator, OperatorMetrics}
import io.substrait.proto.JoinRel

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -84,6 +81,8 @@ abstract class HashJoinLikeExecTransformer(leftKeys: Seq[Expression],
sparkContext, "stream side cpu wall time count"),
"streamWallNanos" -> SQLMetrics.createNanoTimingMetric(
sparkContext, "totaltime_stream_input"),
"streamVeloxToArrow" -> SQLMetrics.createNanoTimingMetric(
sparkContext, "totaltime_velox_to_arrow_converter"),
"streamPeakMemoryBytes" -> SQLMetrics.createSizeMetric(
sparkContext, "stream side peak memory bytes"),
"streamNumMemoryAllocations" -> SQLMetrics.createMetric(
Expand Down Expand Up @@ -258,6 +257,7 @@ abstract class HashJoinLikeExecTransformer(leftKeys: Seq[Expression],
val streamOutputBytes: SQLMetric = longMetric("streamOutputBytes")
val streamCount: SQLMetric = longMetric("streamCount")
val streamWallNanos: SQLMetric = longMetric("streamWallNanos")
val streamVeloxToArrow: SQLMetric = longMetric("streamVeloxToArrow")
val streamPeakMemoryBytes: SQLMetric = longMetric("streamPeakMemoryBytes")
val streamNumMemoryAllocations: SQLMetric = longMetric("streamNumMemoryAllocations")

Expand Down Expand Up @@ -401,6 +401,7 @@ abstract class HashJoinLikeExecTransformer(leftKeys: Seq[Expression],
}

def updateJoinMetrics(joinMetrics: java.util.ArrayList[OperatorMetrics],
singleMetrics: SingleMetric,
joinParams: JoinParams): Unit = {
var idx = 0
if (joinParams.postProjectionNeeded) {
Expand Down Expand Up @@ -516,6 +517,7 @@ abstract class HashJoinLikeExecTransformer(leftKeys: Seq[Expression],
streamOutputBytes += metrics.outputBytes
streamCount += metrics.count
streamWallNanos += metrics.wallNanos
streamVeloxToArrow += singleMetrics.veloxToArrow
streamPeakMemoryBytes += metrics.peakMemoryBytes
streamNumMemoryAllocations += metrics.numMemoryAllocations
idx += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.glutenproject.expression._
import io.glutenproject.substrait.{AggregationParams, JoinParams, SubstraitContext}
import io.glutenproject.substrait.plan.{PlanBuilder, PlanNode}
import io.glutenproject.substrait.rel.RelNode
import io.glutenproject.vectorized.Metrics.SingleMetric
import io.glutenproject.vectorized._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -447,7 +448,8 @@ case class WholeStageTransformerExec(child: SparkPlan)(val transformStageId: Int
// Therefore, fetch one more suite of metrics here.
operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx))
curMetricsIdx -= 1
joinTransformer.updateJoinMetrics(operatorMetrics, joinParamsMap.get(operatorIdx))
joinTransformer.updateJoinMetrics(operatorMetrics, metrics.getSingleMetrics,
joinParamsMap.get(operatorIdx))

var newOperatorIdx: java.lang.Long = operatorIdx - 1
var newMetricsIdx: Int = curMetricsIdx
Expand Down

0 comments on commit 805b880

Please sign in to comment.