Skip to content

Commit

Permalink
[GLUTEN-7953][VL] Fetch and dump all inputs for micro benchmark on mi…
Browse files Browse the repository at this point in the history
…ddle stage begin (#7998)
  • Loading branch information
marin-ma authored Nov 25, 2024
1 parent 114a1da commit 58e7d83
Show file tree
Hide file tree
Showing 31 changed files with 456 additions and 405 deletions.
8 changes: 5 additions & 3 deletions .github/workflows/velox_backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -969,9 +969,11 @@ jobs:
run: |
$MVN_CMD test -Pspark-3.5 -Pbackends-velox -pl backends-velox -am \
-DtagsToInclude="org.apache.gluten.tags.GenerateExample" -Dtest=none -DfailIfNoTests=false -Dexec.skip
# This test depends on example.json generated by the above mvn test.
cd cpp/build/velox/benchmarks && sudo chmod +x ./generic_benchmark
./generic_benchmark --run-example --with-shuffle --threads 1 --iterations 1
# This test depends on files generated by the above mvn test.
./cpp/build/velox/benchmarks/generic_benchmark --with-shuffle --partitioning hash --threads 1 --iterations 1 \
--conf $(realpath backends-velox/generated-native-benchmark/conf_12_0.ini) \
--plan $(realpath backends-velox/generated-native-benchmark/plan_12_0.json) \
--data $(realpath backends-velox/generated-native-benchmark/data_12_0_0.parquet),$(realpath backends-velox/generated-native-benchmark/data_12_0_1.parquet)
- name: Run UDF test
run: |
# Depends on --build_example=ON.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,14 @@ package org.apache.gluten.benchmarks
import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.{VeloxWholeStageTransformerSuite, WholeStageTransformer}

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec}
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.internal.SQLConf

import org.apache.commons.io.FileUtils
import org.scalatest.Tag

import java.io.File
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}

import scala.collection.JavaConverters._

object GenerateExample extends Tag("org.apache.gluten.tags.GenerateExample")

Expand All @@ -50,6 +46,11 @@ class NativeBenchmarkPlanGenerator extends VeloxWholeStageTransformerSuite {
createTPCHNotNullTables()
}

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
}

test("Test plan json non-empty - AQE off") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
Expand All @@ -67,7 +68,6 @@ class NativeBenchmarkPlanGenerator extends VeloxWholeStageTransformerSuite {
planJson = lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson
assert(planJson.nonEmpty)
}
spark.sparkContext.setLogLevel(logLevel)
}

test("Test plan json non-empty - AQE on") {
Expand All @@ -88,70 +88,42 @@ class NativeBenchmarkPlanGenerator extends VeloxWholeStageTransformerSuite {
val planJson = lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson
assert(planJson.nonEmpty)
}
spark.sparkContext.setLogLevel(logLevel)
}

test("generate example", GenerateExample) {
import testImplicits._
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
GlutenConfig.CACHE_WHOLE_STAGE_TRANSFORMER_CONTEXT.key -> "true"
GlutenConfig.BENCHMARK_SAVE_DIR.key -> generatedPlanDir,
GlutenConfig.BENCHMARK_TASK_STAGEID.key -> "12",
GlutenConfig.BENCHMARK_TASK_PARTITIONID.key -> "0"
) {
logWarning(s"Generating inputs for micro benchmark to $generatedPlanDir")
val q4_lineitem = spark
.sql(s"""
|select l_orderkey from lineitem where l_commitdate < l_receiptdate
|""".stripMargin)
val q4_orders = spark
.sql(s"""
|select o_orderkey, o_orderpriority
| from orders
| where o_orderdate >= '1993-07-01' and o_orderdate < '1993-10-01'
|""".stripMargin)
q4_lineitem
.createOrReplaceTempView("q4_lineitem")
q4_orders
.createOrReplaceTempView("q4_orders")

q4_lineitem
.repartition(1, 'l_orderkey)
.write
.format(outputFileFormat)
.save(generatedPlanDir + "/example_lineitem")
q4_orders
.repartition(1, 'o_orderkey)
.write
.format(outputFileFormat)
.save(generatedPlanDir + "/example_orders")

val df =
spark.sql("""
|select * from q4_orders left semi join q4_lineitem on l_orderkey = o_orderkey
|""".stripMargin)
generateSubstraitJson(df, "example.json")
spark
.sql("""
|select /*+ REPARTITION(1) */
| o_orderpriority,
| count(*) as order_count
|from
| orders
|where
| o_orderdate >= date '1993-07-01'
| and o_orderdate < date '1993-07-01' + interval '3' month
| and exists (
| select /*+ REPARTITION(1) */
| *
| from
| lineitem
| where
| l_orderkey = o_orderkey
| and l_commitdate < l_receiptdate
| )
|group by
| o_orderpriority
|order by
| o_orderpriority
|""".stripMargin)
.foreach(_ => ())
}
spark.sparkContext.setLogLevel(logLevel)
}

def generateSubstraitJson(df: DataFrame, file: String): Unit = {
val executedPlan = df.queryExecution.executedPlan
executedPlan.execute()
val finalPlan =
executedPlan match {
case aqe: AdaptiveSparkPlanExec =>
aqe.executedPlan match {
case s: ShuffleQueryStageExec => s.shuffle.child
case other => other
}
case plan => plan
}
val lastStageTransformer = finalPlan.find(_.isInstanceOf[WholeStageTransformer])
assert(lastStageTransformer.nonEmpty)
val plan =
lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson.split('\n')

val exampleJsonFile = Paths.get(generatedPlanDir, file)
Files.write(exampleJsonFile, plan.toList.asJava, StandardCharsets.UTF_8)
}
}
4 changes: 0 additions & 4 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,6 @@ if(ENABLE_IAA)
add_definitions(-DGLUTEN_ENABLE_IAA)
endif()

if(ENABLE_ORC)
add_definitions(-DGLUTEN_ENABLE_ORC)
endif()

#
# Subdirectories
#
Expand Down
3 changes: 3 additions & 0 deletions cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "operators/c2r/ColumnarToRow.h"
#include "operators/r2c/RowToColumnar.h"
#include "operators/serializer/ColumnarBatchSerializer.h"
#include "operators/writer/ArrowWriter.h"
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"
#include "substrait/plan.pb.h"
Expand Down Expand Up @@ -124,6 +125,8 @@ class Runtime : public std::enable_shared_from_this<Runtime> {

virtual void dumpConf(const std::string& path) = 0;

virtual std::shared_ptr<ArrowWriter> createArrowWriter(const std::string& path) = 0;

const std::unordered_map<std::string, std::string>& getConfMap() {
return confMap_;
}
Expand Down
44 changes: 28 additions & 16 deletions cpp/core/jni/JniCommon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,34 @@ gluten::JniColumnarBatchIterator::~JniColumnarBatchIterator() {
std::shared_ptr<gluten::ColumnarBatch> gluten::JniColumnarBatchIterator::next() {
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
if (!env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext_)) {
checkException(env);
return nullptr; // stream ended
}

checkException(env);
jlong handle = env->CallLongMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorNext_);
checkException(env);
auto batch = ObjectStore::retrieve<ColumnarBatch>(handle);
if (writer_ != nullptr) {
// save snapshot of the batch to file
std::shared_ptr<ArrowSchema> schema = batch->exportArrowSchema();
std::shared_ptr<ArrowArray> array = batch->exportArrowArray();
auto rb = gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(), schema.get()));
GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get())));
GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb));
if (!writer_->closed()) {
// Dump all inputs.
while (env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext_)) {
checkException(env);
jlong handle = env->CallLongMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorNext_);
checkException(env);
auto batch = ObjectStore::retrieve<ColumnarBatch>(handle);

// Save the snapshot of the batch to file.
std::shared_ptr<ArrowSchema> schema = batch->exportArrowSchema();
std::shared_ptr<ArrowArray> array = batch->exportArrowArray();
auto rb = gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(), schema.get()));
GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get())));
GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb));
}
checkException(env);
GLUTEN_THROW_NOT_OK(writer_->closeWriter());
}
return writer_->retrieveColumnarBatch();
} else {
if (!env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext_)) {
checkException(env);
return nullptr; // stream ended
}
checkException(env);
jlong handle = env->CallLongMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorNext_);
checkException(env);
return ObjectStore::retrieve<ColumnarBatch>(handle);
}
return batch;
}
1 change: 1 addition & 0 deletions cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "compute/Runtime.h"
#include "config/GlutenConfig.h"
#include "memory/AllocationListener.h"
#include "operators/writer/ArrowWriter.h"
#include "shuffle/rss/RssClient.h"
#include "utils/Compression.h"
#include "utils/Exception.h"
Expand Down
14 changes: 11 additions & 3 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,16 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
}
saveDir = conf.at(kGlutenSaveDir);
std::filesystem::path f{saveDir};
if (!std::filesystem::exists(f)) {
throw GlutenException("Save input path " + saveDir + " does not exists");
if (std::filesystem::exists(f)) {
if (!std::filesystem::is_directory(f)) {
throw GlutenException("Invalid path for " + kGlutenSaveDir + ": " + saveDir);
}
} else {
std::error_code ec;
std::filesystem::create_directory(f, ec);
if (ec) {
throw GlutenException("Failed to create directory: " + saveDir + ", error message: " + ec.message());
}
}
ctx->dumpConf(saveDir + "/conf" + fileIdentifier + ".ini");
}
Expand Down Expand Up @@ -407,7 +415,7 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
std::shared_ptr<ArrowWriter> writer = nullptr;
if (saveInput) {
auto file = saveDir + "/data" + fileIdentifier + "_" + std::to_string(idx) + ".parquet";
writer = std::make_shared<ArrowWriter>(file);
writer = ctx->createArrowWriter(file);
}
jobject iter = env->GetObjectArrayElement(iterArr, idx);
auto arrayIter = makeJniColumnarBatchIterator(env, iter, ctx, writer);
Expand Down
1 change: 0 additions & 1 deletion cpp/core/memory/ColumnarBatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include "arrow/c/helpers.h"
#include "arrow/record_batch.h"
#include "memory/MemoryManager.h"
#include "operators/writer/ArrowWriter.h"
#include "utils/ArrowStatus.h"
#include "utils/Exception.h"

Expand Down
9 changes: 8 additions & 1 deletion cpp/core/operators/writer/ArrowWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "arrow/table.h"
#include "arrow/util/type_fwd.h"

namespace gluten {
arrow::Status ArrowWriter::initWriter(arrow::Schema& schema) {
if (writer_ != nullptr) {
return arrow::Status::OK();
Expand Down Expand Up @@ -50,9 +51,15 @@ arrow::Status ArrowWriter::writeInBatches(std::shared_ptr<arrow::RecordBatch> ba
}

arrow::Status ArrowWriter::closeWriter() {
// Write file footer and close
// Write file footer and close.
if (writer_ != nullptr) {
ARROW_RETURN_NOT_OK(writer_->Close());
}
closed_ = true;
return arrow::Status::OK();
}

bool ArrowWriter::closed() const {
return closed_;
}
} // namespace gluten
16 changes: 13 additions & 3 deletions cpp/core/operators/writer/ArrowWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,33 @@

#pragma once

#include "parquet/arrow/writer.h"
#include <parquet/arrow/writer.h>
#include "memory/ColumnarBatch.h"

namespace gluten {
/**
* @brief Used to print RecordBatch to a parquet file
*
*/
class ArrowWriter {
public:
explicit ArrowWriter(std::string& path) : path_(path) {}
explicit ArrowWriter(const std::string& path) : path_(path) {}

virtual ~ArrowWriter() = default;

arrow::Status initWriter(arrow::Schema& schema);

arrow::Status writeInBatches(std::shared_ptr<arrow::RecordBatch> batch);

arrow::Status closeWriter();

private:
bool closed() const;

virtual std::shared_ptr<ColumnarBatch> retrieveColumnarBatch() = 0;

protected:
std::unique_ptr<parquet::arrow::FileWriter> writer_;
std::string path_;
bool closed_{false};
};
} // namespace gluten
3 changes: 3 additions & 0 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,12 @@ set(VELOX_SRCS
operators/functions/RegistrationAllFunctions.cc
operators/functions/RowConstructorWithNull.cc
operators/functions/SparkExprToSubfieldFilterParser.cc
operators/reader/FileReaderIterator.cc
operators/reader/ParquetReaderIterator.cc
operators/serializer/VeloxColumnarToRowConverter.cc
operators/serializer/VeloxColumnarBatchSerializer.cc
operators/serializer/VeloxRowToColumnarConverter.cc
operators/writer/VeloxArrowWriter.cc
operators/writer/VeloxParquetDataSource.cc
shuffle/VeloxShuffleReader.cc
shuffle/VeloxShuffleWriter.cc
Expand Down
7 changes: 1 addition & 6 deletions cpp/velox/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@

find_arrow_lib(${PARQUET_LIB_NAME})

set(VELOX_BENCHMARK_COMMON_SRCS common/FileReaderIterator.cc
common/BenchmarkUtils.cc)
set(VELOX_BENCHMARK_COMMON_SRCS common/BenchmarkUtils.cc)
add_library(velox_benchmark_common STATIC ${VELOX_BENCHMARK_COMMON_SRCS})
target_include_directories(
velox_benchmark_common PUBLIC ${CMAKE_SOURCE_DIR}/velox
Expand All @@ -38,7 +37,3 @@ add_velox_benchmark(columnar_to_row_benchmark ColumnarToRowBenchmark.cc)
add_velox_benchmark(parquet_write_benchmark ParquetWriteBenchmark.cc)

add_velox_benchmark(plan_validator_util PlanValidatorUtil.cc)

if(ENABLE_ORC)
add_velox_benchmark(orc_converter exec/OrcConverter.cc)
endif()
Loading

0 comments on commit 58e7d83

Please sign in to comment.