From 58e7d831e7a18d707576a983f2b49d91b0b3d67f Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Mon, 25 Nov 2024 09:19:52 +0800 Subject: [PATCH] [GLUTEN-7953][VL] Fetch and dump all inputs for micro benchmark on middle stage begin (#7998) --- .github/workflows/velox_backend.yml | 8 +- .../NativeBenchmarkPlanGenerator.scala | 98 ++++++---------- cpp/CMakeLists.txt | 4 - cpp/core/compute/Runtime.h | 3 + cpp/core/jni/JniCommon.cc | 44 ++++--- cpp/core/jni/JniCommon.h | 1 + cpp/core/jni/JniWrapper.cc | 14 ++- cpp/core/memory/ColumnarBatch.h | 1 - cpp/core/operators/writer/ArrowWriter.cc | 9 +- cpp/core/operators/writer/ArrowWriter.h | 16 ++- cpp/velox/CMakeLists.txt | 3 + cpp/velox/benchmarks/CMakeLists.txt | 7 +- cpp/velox/benchmarks/GenericBenchmark.cc | 34 ++---- .../benchmarks/common/OrcReaderIterator.h | 107 ------------------ .../benchmarks/common/ParquetReaderIterator.h | 104 ----------------- cpp/velox/benchmarks/exec/OrcConverter.cc | 2 +- cpp/velox/compute/VeloxRuntime.cc | 14 ++- cpp/velox/compute/VeloxRuntime.h | 2 + .../reader}/FileReaderIterator.cc | 41 ++++--- .../reader}/FileReaderIterator.h | 28 ++--- .../operators/reader/ParquetReaderIterator.cc | 97 ++++++++++++++++ .../operators/reader/ParquetReaderIterator.h | 65 +++++++++++ .../VeloxColumnarBatchSerializer.cc | 3 +- .../operators/writer/VeloxArrowWriter.cc | 38 +++++++ cpp/velox/operators/writer/VeloxArrowWriter.h | 35 ++++++ cpp/velox/tests/RuntimeTest.cc | 4 + .../tests/VeloxColumnarBatchSerializerTest.cc | 3 +- cpp/velox/utils/VeloxArrowUtils.cc | 3 + docs/developers/MicroBenchmarks.md | 14 +-- .../org/apache/gluten/utils/DebugUtil.scala | 38 ++++--- .../org/apache/gluten/GlutenConfig.scala | 21 ++-- 31 files changed, 456 insertions(+), 405 deletions(-) delete mode 100644 cpp/velox/benchmarks/common/OrcReaderIterator.h delete mode 100644 cpp/velox/benchmarks/common/ParquetReaderIterator.h rename cpp/velox/{benchmarks/common => operators/reader}/FileReaderIterator.cc (55%) rename cpp/velox/{benchmarks/common => operators/reader}/FileReaderIterator.h (69%) create mode 100644 cpp/velox/operators/reader/ParquetReaderIterator.cc create mode 100644 cpp/velox/operators/reader/ParquetReaderIterator.h create mode 100644 cpp/velox/operators/writer/VeloxArrowWriter.cc create mode 100644 cpp/velox/operators/writer/VeloxArrowWriter.h diff --git a/.github/workflows/velox_backend.yml b/.github/workflows/velox_backend.yml index 42e1f2527816..d7445d1a2752 100644 --- a/.github/workflows/velox_backend.yml +++ b/.github/workflows/velox_backend.yml @@ -969,9 +969,11 @@ jobs: run: | $MVN_CMD test -Pspark-3.5 -Pbackends-velox -pl backends-velox -am \ -DtagsToInclude="org.apache.gluten.tags.GenerateExample" -Dtest=none -DfailIfNoTests=false -Dexec.skip - # This test depends on example.json generated by the above mvn test. - cd cpp/build/velox/benchmarks && sudo chmod +x ./generic_benchmark - ./generic_benchmark --run-example --with-shuffle --threads 1 --iterations 1 + # This test depends on files generated by the above mvn test. + ./cpp/build/velox/benchmarks/generic_benchmark --with-shuffle --partitioning hash --threads 1 --iterations 1 \ + --conf $(realpath backends-velox/generated-native-benchmark/conf_12_0.ini) \ + --plan $(realpath backends-velox/generated-native-benchmark/plan_12_0.json) \ + --data $(realpath backends-velox/generated-native-benchmark/data_12_0_0.parquet),$(realpath backends-velox/generated-native-benchmark/data_12_0_1.parquet) - name: Run UDF test run: | # Depends on --build_example=ON. diff --git a/backends-velox/src/test/scala/org/apache/gluten/benchmarks/NativeBenchmarkPlanGenerator.scala b/backends-velox/src/test/scala/org/apache/gluten/benchmarks/NativeBenchmarkPlanGenerator.scala index 57fbca17447a..1e378d16f14b 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/benchmarks/NativeBenchmarkPlanGenerator.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/benchmarks/NativeBenchmarkPlanGenerator.scala @@ -19,18 +19,14 @@ package org.apache.gluten.benchmarks import org.apache.gluten.GlutenConfig import org.apache.gluten.execution.{VeloxWholeStageTransformerSuite, WholeStageTransformer} -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec} +import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.SQLConf import org.apache.commons.io.FileUtils import org.scalatest.Tag import java.io.File -import java.nio.charset.StandardCharsets -import java.nio.file.{Files, Paths} - -import scala.collection.JavaConverters._ object GenerateExample extends Tag("org.apache.gluten.tags.GenerateExample") @@ -50,6 +46,11 @@ class NativeBenchmarkPlanGenerator extends VeloxWholeStageTransformerSuite { createTPCHNotNullTables() } + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + } + test("Test plan json non-empty - AQE off") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", @@ -67,7 +68,6 @@ class NativeBenchmarkPlanGenerator extends VeloxWholeStageTransformerSuite { planJson = lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson assert(planJson.nonEmpty) } - spark.sparkContext.setLogLevel(logLevel) } test("Test plan json non-empty - AQE on") { @@ -88,70 +88,42 @@ class NativeBenchmarkPlanGenerator extends VeloxWholeStageTransformerSuite { val planJson = lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson assert(planJson.nonEmpty) } - spark.sparkContext.setLogLevel(logLevel) } test("generate example", GenerateExample) { - import testImplicits._ withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", SQLConf.SHUFFLE_PARTITIONS.key -> "2", - GlutenConfig.CACHE_WHOLE_STAGE_TRANSFORMER_CONTEXT.key -> "true" + GlutenConfig.BENCHMARK_SAVE_DIR.key -> generatedPlanDir, + GlutenConfig.BENCHMARK_TASK_STAGEID.key -> "12", + GlutenConfig.BENCHMARK_TASK_PARTITIONID.key -> "0" ) { logWarning(s"Generating inputs for micro benchmark to $generatedPlanDir") - val q4_lineitem = spark - .sql(s""" - |select l_orderkey from lineitem where l_commitdate < l_receiptdate - |""".stripMargin) - val q4_orders = spark - .sql(s""" - |select o_orderkey, o_orderpriority - | from orders - | where o_orderdate >= '1993-07-01' and o_orderdate < '1993-10-01' - |""".stripMargin) - q4_lineitem - .createOrReplaceTempView("q4_lineitem") - q4_orders - .createOrReplaceTempView("q4_orders") - - q4_lineitem - .repartition(1, 'l_orderkey) - .write - .format(outputFileFormat) - .save(generatedPlanDir + "/example_lineitem") - q4_orders - .repartition(1, 'o_orderkey) - .write - .format(outputFileFormat) - .save(generatedPlanDir + "/example_orders") - - val df = - spark.sql(""" - |select * from q4_orders left semi join q4_lineitem on l_orderkey = o_orderkey - |""".stripMargin) - generateSubstraitJson(df, "example.json") + spark + .sql(""" + |select /*+ REPARTITION(1) */ + | o_orderpriority, + | count(*) as order_count + |from + | orders + |where + | o_orderdate >= date '1993-07-01' + | and o_orderdate < date '1993-07-01' + interval '3' month + | and exists ( + | select /*+ REPARTITION(1) */ + | * + | from + | lineitem + | where + | l_orderkey = o_orderkey + | and l_commitdate < l_receiptdate + | ) + |group by + | o_orderpriority + |order by + | o_orderpriority + |""".stripMargin) + .foreach(_ => ()) } - spark.sparkContext.setLogLevel(logLevel) - } - - def generateSubstraitJson(df: DataFrame, file: String): Unit = { - val executedPlan = df.queryExecution.executedPlan - executedPlan.execute() - val finalPlan = - executedPlan match { - case aqe: AdaptiveSparkPlanExec => - aqe.executedPlan match { - case s: ShuffleQueryStageExec => s.shuffle.child - case other => other - } - case plan => plan - } - val lastStageTransformer = finalPlan.find(_.isInstanceOf[WholeStageTransformer]) - assert(lastStageTransformer.nonEmpty) - val plan = - lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson.split('\n') - - val exampleJsonFile = Paths.get(generatedPlanDir, file) - Files.write(exampleJsonFile, plan.toList.asJava, StandardCharsets.UTF_8) } } diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 09d09481f216..67fb9ec721ac 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -217,10 +217,6 @@ if(ENABLE_IAA) add_definitions(-DGLUTEN_ENABLE_IAA) endif() -if(ENABLE_ORC) - add_definitions(-DGLUTEN_ENABLE_ORC) -endif() - # # Subdirectories # diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index a58f770ff74c..3090652b8148 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -27,6 +27,7 @@ #include "operators/c2r/ColumnarToRow.h" #include "operators/r2c/RowToColumnar.h" #include "operators/serializer/ColumnarBatchSerializer.h" +#include "operators/writer/ArrowWriter.h" #include "shuffle/ShuffleReader.h" #include "shuffle/ShuffleWriter.h" #include "substrait/plan.pb.h" @@ -124,6 +125,8 @@ class Runtime : public std::enable_shared_from_this { virtual void dumpConf(const std::string& path) = 0; + virtual std::shared_ptr createArrowWriter(const std::string& path) = 0; + const std::unordered_map& getConfMap() { return confMap_; } diff --git a/cpp/core/jni/JniCommon.cc b/cpp/core/jni/JniCommon.cc index 46b53e8c3b00..bb8554568ba0 100644 --- a/cpp/core/jni/JniCommon.cc +++ b/cpp/core/jni/JniCommon.cc @@ -104,22 +104,34 @@ gluten::JniColumnarBatchIterator::~JniColumnarBatchIterator() { std::shared_ptr gluten::JniColumnarBatchIterator::next() { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm_, &env); - if (!env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext_)) { - checkException(env); - return nullptr; // stream ended - } - - checkException(env); - jlong handle = env->CallLongMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorNext_); - checkException(env); - auto batch = ObjectStore::retrieve(handle); if (writer_ != nullptr) { - // save snapshot of the batch to file - std::shared_ptr schema = batch->exportArrowSchema(); - std::shared_ptr array = batch->exportArrowArray(); - auto rb = gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(), schema.get())); - GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get()))); - GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb)); + if (!writer_->closed()) { + // Dump all inputs. + while (env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext_)) { + checkException(env); + jlong handle = env->CallLongMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorNext_); + checkException(env); + auto batch = ObjectStore::retrieve(handle); + + // Save the snapshot of the batch to file. + std::shared_ptr schema = batch->exportArrowSchema(); + std::shared_ptr array = batch->exportArrowArray(); + auto rb = gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(), schema.get())); + GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get()))); + GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb)); + } + checkException(env); + GLUTEN_THROW_NOT_OK(writer_->closeWriter()); + } + return writer_->retrieveColumnarBatch(); + } else { + if (!env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext_)) { + checkException(env); + return nullptr; // stream ended + } + checkException(env); + jlong handle = env->CallLongMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorNext_); + checkException(env); + return ObjectStore::retrieve(handle); } - return batch; } diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h index aeab454f1aab..8f40398a4132 100644 --- a/cpp/core/jni/JniCommon.h +++ b/cpp/core/jni/JniCommon.h @@ -26,6 +26,7 @@ #include "compute/Runtime.h" #include "config/GlutenConfig.h" #include "memory/AllocationListener.h" +#include "operators/writer/ArrowWriter.h" #include "shuffle/rss/RssClient.h" #include "utils/Compression.h" #include "utils/Exception.h" diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 9da55894864e..963440f6fc16 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -373,8 +373,16 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith } saveDir = conf.at(kGlutenSaveDir); std::filesystem::path f{saveDir}; - if (!std::filesystem::exists(f)) { - throw GlutenException("Save input path " + saveDir + " does not exists"); + if (std::filesystem::exists(f)) { + if (!std::filesystem::is_directory(f)) { + throw GlutenException("Invalid path for " + kGlutenSaveDir + ": " + saveDir); + } + } else { + std::error_code ec; + std::filesystem::create_directory(f, ec); + if (ec) { + throw GlutenException("Failed to create directory: " + saveDir + ", error message: " + ec.message()); + } } ctx->dumpConf(saveDir + "/conf" + fileIdentifier + ".ini"); } @@ -407,7 +415,7 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith std::shared_ptr writer = nullptr; if (saveInput) { auto file = saveDir + "/data" + fileIdentifier + "_" + std::to_string(idx) + ".parquet"; - writer = std::make_shared(file); + writer = ctx->createArrowWriter(file); } jobject iter = env->GetObjectArrayElement(iterArr, idx); auto arrayIter = makeJniColumnarBatchIterator(env, iter, ctx, writer); diff --git a/cpp/core/memory/ColumnarBatch.h b/cpp/core/memory/ColumnarBatch.h index e0bab254189a..be487f871e74 100644 --- a/cpp/core/memory/ColumnarBatch.h +++ b/cpp/core/memory/ColumnarBatch.h @@ -23,7 +23,6 @@ #include "arrow/c/helpers.h" #include "arrow/record_batch.h" #include "memory/MemoryManager.h" -#include "operators/writer/ArrowWriter.h" #include "utils/ArrowStatus.h" #include "utils/Exception.h" diff --git a/cpp/core/operators/writer/ArrowWriter.cc b/cpp/core/operators/writer/ArrowWriter.cc index 19bab6ddcba3..46ec2fc9ba6c 100644 --- a/cpp/core/operators/writer/ArrowWriter.cc +++ b/cpp/core/operators/writer/ArrowWriter.cc @@ -21,6 +21,7 @@ #include "arrow/table.h" #include "arrow/util/type_fwd.h" +namespace gluten { arrow::Status ArrowWriter::initWriter(arrow::Schema& schema) { if (writer_ != nullptr) { return arrow::Status::OK(); @@ -50,9 +51,15 @@ arrow::Status ArrowWriter::writeInBatches(std::shared_ptr ba } arrow::Status ArrowWriter::closeWriter() { - // Write file footer and close + // Write file footer and close. if (writer_ != nullptr) { ARROW_RETURN_NOT_OK(writer_->Close()); } + closed_ = true; return arrow::Status::OK(); } + +bool ArrowWriter::closed() const { + return closed_; +} +} // namespace gluten diff --git a/cpp/core/operators/writer/ArrowWriter.h b/cpp/core/operators/writer/ArrowWriter.h index 0d0b8ce2cb58..1a7b19606624 100644 --- a/cpp/core/operators/writer/ArrowWriter.h +++ b/cpp/core/operators/writer/ArrowWriter.h @@ -17,15 +17,19 @@ #pragma once -#include "parquet/arrow/writer.h" +#include +#include "memory/ColumnarBatch.h" +namespace gluten { /** * @brief Used to print RecordBatch to a parquet file * */ class ArrowWriter { public: - explicit ArrowWriter(std::string& path) : path_(path) {} + explicit ArrowWriter(const std::string& path) : path_(path) {} + + virtual ~ArrowWriter() = default; arrow::Status initWriter(arrow::Schema& schema); @@ -33,7 +37,13 @@ class ArrowWriter { arrow::Status closeWriter(); - private: + bool closed() const; + + virtual std::shared_ptr retrieveColumnarBatch() = 0; + + protected: std::unique_ptr writer_; std::string path_; + bool closed_{false}; }; +} // namespace gluten diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index eff31863d438..9e110853eb6e 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -169,9 +169,12 @@ set(VELOX_SRCS operators/functions/RegistrationAllFunctions.cc operators/functions/RowConstructorWithNull.cc operators/functions/SparkExprToSubfieldFilterParser.cc + operators/reader/FileReaderIterator.cc + operators/reader/ParquetReaderIterator.cc operators/serializer/VeloxColumnarToRowConverter.cc operators/serializer/VeloxColumnarBatchSerializer.cc operators/serializer/VeloxRowToColumnarConverter.cc + operators/writer/VeloxArrowWriter.cc operators/writer/VeloxParquetDataSource.cc shuffle/VeloxShuffleReader.cc shuffle/VeloxShuffleWriter.cc diff --git a/cpp/velox/benchmarks/CMakeLists.txt b/cpp/velox/benchmarks/CMakeLists.txt index 1aa199b13696..6b2cda358c06 100644 --- a/cpp/velox/benchmarks/CMakeLists.txt +++ b/cpp/velox/benchmarks/CMakeLists.txt @@ -15,8 +15,7 @@ find_arrow_lib(${PARQUET_LIB_NAME}) -set(VELOX_BENCHMARK_COMMON_SRCS common/FileReaderIterator.cc - common/BenchmarkUtils.cc) +set(VELOX_BENCHMARK_COMMON_SRCS common/BenchmarkUtils.cc) add_library(velox_benchmark_common STATIC ${VELOX_BENCHMARK_COMMON_SRCS}) target_include_directories( velox_benchmark_common PUBLIC ${CMAKE_SOURCE_DIR}/velox @@ -38,7 +37,3 @@ add_velox_benchmark(columnar_to_row_benchmark ColumnarToRowBenchmark.cc) add_velox_benchmark(parquet_write_benchmark ParquetWriteBenchmark.cc) add_velox_benchmark(plan_validator_util PlanValidatorUtil.cc) - -if(ENABLE_ORC) - add_velox_benchmark(orc_converter exec/OrcConverter.cc) -endif() diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index 4e38fb44327f..dcb64d7d18d0 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -25,12 +25,12 @@ #include #include "benchmarks/common/BenchmarkUtils.h" -#include "benchmarks/common/FileReaderIterator.h" #include "compute/VeloxBackend.h" -#include "compute/VeloxPlanConverter.h" #include "compute/VeloxRuntime.h" #include "config/GlutenConfig.h" #include "config/VeloxConfig.h" +#include "operators/reader/FileReaderIterator.h" +#include "operators/writer/VeloxArrowWriter.h" #include "shuffle/LocalPartitionWriter.h" #include "shuffle/VeloxShuffleWriter.h" #include "shuffle/rss/RssPartitionWriter.h" @@ -45,7 +45,6 @@ using namespace gluten; namespace { -DEFINE_bool(run_example, false, "Run the example and exit."); DEFINE_bool(print_result, true, "Print result for execution"); DEFINE_string(save_output, "", "Path to parquet file for saving the task output iterator"); DEFINE_bool(with_shuffle, false, "Add shuffle split at end."); @@ -388,7 +387,8 @@ auto BM_Generic = [](::benchmark::State& state, std::vector inputItersRaw; if (!dataFiles.empty()) { for (const auto& input : dataFiles) { - inputIters.push_back(getInputIteratorFromFileReader(input, readerType)); + inputIters.push_back(FileReaderIterator::getInputIteratorFromFileReader( + readerType, input, FLAGS_batch_size, runtime->memoryManager()->getLeafMemoryPool().get())); } std::transform( inputIters.begin(), @@ -417,10 +417,11 @@ auto BM_Generic = [](::benchmark::State& state, ArrowSchema cSchema; toArrowSchema(veloxPlan->outputType(), runtime->memoryManager()->getLeafMemoryPool().get(), &cSchema); GLUTEN_ASSIGN_OR_THROW(auto outputSchema, arrow::ImportSchema(&cSchema)); - ArrowWriter writer{FLAGS_save_output}; + auto writer = std::make_shared( + FLAGS_save_output, FLAGS_batch_size, runtime->memoryManager()->getLeafMemoryPool().get()); state.PauseTiming(); if (!FLAGS_save_output.empty()) { - GLUTEN_THROW_NOT_OK(writer.initWriter(*(outputSchema.get()))); + GLUTEN_THROW_NOT_OK(writer->initWriter(*(outputSchema.get()))); } state.ResumeTiming(); @@ -436,13 +437,13 @@ auto BM_Generic = [](::benchmark::State& state, LOG(WARNING) << maybeBatch.ValueOrDie()->ToString(); } if (!FLAGS_save_output.empty()) { - GLUTEN_THROW_NOT_OK(writer.writeInBatches(maybeBatch.ValueOrDie())); + GLUTEN_THROW_NOT_OK(writer->writeInBatches(maybeBatch.ValueOrDie())); } } state.PauseTiming(); if (!FLAGS_save_output.empty()) { - GLUTEN_THROW_NOT_OK(writer.closeWriter()); + GLUTEN_THROW_NOT_OK(writer->closeWriter()); } state.ResumeTiming(); } @@ -488,7 +489,8 @@ auto BM_ShuffleWriteRead = [](::benchmark::State& state, { ScopedTimer timer(&elapsedTime); for (auto _ : state) { - auto resultIter = getInputIteratorFromFileReader(inputFile, readerType); + auto resultIter = FileReaderIterator::getInputIteratorFromFileReader( + readerType, inputFile, FLAGS_batch_size, runtime->memoryManager()->getLeafMemoryPool().get()); runShuffle( runtime, listenerPtr, @@ -591,19 +593,7 @@ int main(int argc, char** argv) { std::vector splitFiles{}; std::vector dataFiles{}; - if (FLAGS_run_example) { - LOG(WARNING) << "Running example..."; - dataFiles.resize(2); - try { - substraitJsonFile = getGeneratedFilePath("example.json"); - dataFiles[0] = getGeneratedFilePath("example_orders"); - dataFiles[1] = getGeneratedFilePath("example_lineitem"); - } catch (const std::exception& e) { - LOG(ERROR) << "Failed to run example. " << e.what(); - ::benchmark::Shutdown(); - std::exit(EXIT_FAILURE); - } - } else if (FLAGS_run_shuffle) { + if (FLAGS_run_shuffle) { std::string errorMsg{}; if (FLAGS_data.empty()) { errorMsg = "Missing '--split' or '--data' option."; diff --git a/cpp/velox/benchmarks/common/OrcReaderIterator.h b/cpp/velox/benchmarks/common/OrcReaderIterator.h deleted file mode 100644 index f8c9f44b2008..000000000000 --- a/cpp/velox/benchmarks/common/OrcReaderIterator.h +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include -#include "benchmarks/common/FileReaderIterator.h" - -namespace gluten { - -class OrcReaderIterator : public FileReaderIterator { - public: - explicit OrcReaderIterator(const std::string& path) : FileReaderIterator(path) {} - - void createReader() override { - // Open File - auto input = arrow::io::ReadableFile::Open(path_); - GLUTEN_THROW_NOT_OK(input); - - // Open ORC File Reader - auto maybeReader = arrow::adapters::orc::ORCFileReader::Open(*input, arrow::default_memory_pool()); - GLUTEN_THROW_NOT_OK(maybeReader); - fileReader_.reset((*maybeReader).release()); - - // get record batch Reader - auto recordBatchReader = fileReader_->GetRecordBatchReader(4096, std::vector()); - GLUTEN_THROW_NOT_OK(recordBatchReader); - recordBatchReader_ = *recordBatchReader; - } - - std::shared_ptr getSchema() override { - auto schema = fileReader_->ReadSchema(); - GLUTEN_THROW_NOT_OK(schema); - return *schema; - } - - protected: - std::unique_ptr fileReader_; - std::shared_ptr recordBatchReader_; -}; - -class OrcStreamReaderIterator final : public OrcReaderIterator { - public: - explicit OrcStreamReaderIterator(const std::string& path) : OrcReaderIterator(path) { - createReader(); - } - - std::shared_ptr next() override { - auto startTime = std::chrono::steady_clock::now(); - GLUTEN_ASSIGN_OR_THROW(auto batch, recordBatchReader_->Next()); - DLOG(INFO) << "OrcStreamReaderIterator get a batch, num rows: " << (batch ? batch->num_rows() : 0); - collectBatchTime_ += - std::chrono::duration_cast(std::chrono::steady_clock::now() - startTime).count(); - if (batch == nullptr) { - return nullptr; - } - return convertBatch(std::make_shared(batch)); - } -}; - -class OrcBufferedReaderIterator final : public OrcReaderIterator { - public: - explicit OrcBufferedReaderIterator(const std::string& path) : OrcReaderIterator(path) { - createReader(); - collectBatches(); - iter_ = batches_.begin(); - DLOG(INFO) << "OrcBufferedReaderIterator open file: " << path; - DLOG(INFO) << "Number of input batches: " << std::to_string(batches_.size()); - if (iter_ != batches_.cend()) { - DLOG(INFO) << "columns: " << (*iter_)->num_columns(); - DLOG(INFO) << "rows: " << (*iter_)->num_rows(); - } - } - - std::shared_ptr next() override { - if (iter_ == batches_.cend()) { - return nullptr; - } - return convertBatch(std::make_shared(*iter_++)); - } - - private: - void collectBatches() { - auto startTime = std::chrono::steady_clock::now(); - GLUTEN_ASSIGN_OR_THROW(batches_, recordBatchReader_->ToRecordBatches()); - auto endTime = std::chrono::steady_clock::now(); - collectBatchTime_ += std::chrono::duration_cast(endTime - startTime).count(); - } - - arrow::RecordBatchVector batches_; - std::vector>::const_iterator iter_; -}; - -} // namespace gluten \ No newline at end of file diff --git a/cpp/velox/benchmarks/common/ParquetReaderIterator.h b/cpp/velox/benchmarks/common/ParquetReaderIterator.h deleted file mode 100644 index 20652ee27dc0..000000000000 --- a/cpp/velox/benchmarks/common/ParquetReaderIterator.h +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "benchmarks/common/BenchmarkUtils.h" -#include "benchmarks/common/FileReaderIterator.h" -#include "utils/Macros.h" - -#include - -namespace gluten { - -class ParquetReaderIterator : public FileReaderIterator { - public: - explicit ParquetReaderIterator(const std::string& path) : FileReaderIterator(path) {} - - void createReader() { - parquet::ArrowReaderProperties properties = parquet::default_arrow_reader_properties(); - properties.set_batch_size(FLAGS_batch_size); - GLUTEN_THROW_NOT_OK(parquet::arrow::FileReader::Make( - arrow::default_memory_pool(), parquet::ParquetFileReader::OpenFile(path_), properties, &fileReader_)); - GLUTEN_THROW_NOT_OK( - fileReader_->GetRecordBatchReader(arrow::internal::Iota(fileReader_->num_row_groups()), &recordBatchReader_)); - - auto schema = recordBatchReader_->schema(); - LOG(INFO) << "schema:\n" << schema->ToString(); - } - - std::shared_ptr getSchema() override { - return recordBatchReader_->schema(); - } - - protected: - std::unique_ptr fileReader_; - std::shared_ptr recordBatchReader_; -}; - -class ParquetStreamReaderIterator final : public ParquetReaderIterator { - public: - explicit ParquetStreamReaderIterator(const std::string& path) : ParquetReaderIterator(path) { - createReader(); - DLOG(INFO) << "ParquetStreamReaderIterator open file: " << path; - } - - std::shared_ptr next() override { - auto startTime = std::chrono::steady_clock::now(); - GLUTEN_ASSIGN_OR_THROW(auto batch, recordBatchReader_->Next()); - DLOG(INFO) << "ParquetStreamReaderIterator get a batch, num rows: " << (batch ? batch->num_rows() : 0); - collectBatchTime_ += - std::chrono::duration_cast(std::chrono::steady_clock::now() - startTime).count(); - if (batch == nullptr) { - return nullptr; - } - return convertBatch(std::make_shared(batch)); - } -}; - -class ParquetBufferedReaderIterator final : public ParquetReaderIterator { - public: - explicit ParquetBufferedReaderIterator(const std::string& path) : ParquetReaderIterator(path) { - createReader(); - collectBatches(); - iter_ = batches_.begin(); - DLOG(INFO) << "ParquetBufferedReaderIterator open file: " << path; - DLOG(INFO) << "Number of input batches: " << std::to_string(batches_.size()); - if (iter_ != batches_.cend()) { - DLOG(INFO) << "columns: " << (*iter_)->num_columns(); - DLOG(INFO) << "rows: " << (*iter_)->num_rows(); - } - } - - std::shared_ptr next() override { - if (iter_ == batches_.cend()) { - return nullptr; - } - return convertBatch(std::make_shared(*iter_++)); - } - - private: - void collectBatches() { - auto startTime = std::chrono::steady_clock::now(); - GLUTEN_ASSIGN_OR_THROW(batches_, recordBatchReader_->ToRecordBatches()); - auto endTime = std::chrono::steady_clock::now(); - collectBatchTime_ += std::chrono::duration_cast(endTime - startTime).count(); - } - - arrow::RecordBatchVector batches_; - std::vector>::const_iterator iter_; -}; - -} // namespace gluten diff --git a/cpp/velox/benchmarks/exec/OrcConverter.cc b/cpp/velox/benchmarks/exec/OrcConverter.cc index b421ecca3b37..888cf27c35fe 100644 --- a/cpp/velox/benchmarks/exec/OrcConverter.cc +++ b/cpp/velox/benchmarks/exec/OrcConverter.cc @@ -16,7 +16,7 @@ */ #include -#include "benchmarks/common/ParquetReaderIterator.h" +#include "operators/reader/ParquetReaderIterator.h" namespace gluten { diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 4c6b52e6fe04..20c3dec939a0 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -28,16 +28,14 @@ #include "compute/VeloxPlanConverter.h" #include "config/VeloxConfig.h" #include "operators/serializer/VeloxRowToColumnarConverter.h" -#include "shuffle/VeloxHashShuffleWriter.h" -#include "shuffle/VeloxRssSortShuffleWriter.h" +#include "operators/writer/VeloxArrowWriter.h" #include "shuffle/VeloxShuffleReader.h" +#include "shuffle/VeloxShuffleWriter.h" #include "utils/ConfigExtractor.h" #include "utils/VeloxArrowUtils.h" #ifdef ENABLE_HDFS - #include "operators/writer/VeloxParquetDataSourceHDFS.h" - #endif #ifdef ENABLE_S3 @@ -308,4 +306,12 @@ void VeloxRuntime::dumpConf(const std::string& path) { outFile.close(); } +std::shared_ptr VeloxRuntime::createArrowWriter(const std::string& path) { + int64_t batchSize = 4096; + if (auto it = confMap_.find(kSparkBatchSize); it != confMap_.end()) { + batchSize = std::atol(it->second.c_str()); + } + return std::make_shared(path, batchSize, memoryManager()->getLeafMemoryPool().get()); +} + } // namespace gluten diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index 846f740cb84c..798fa5bc72f7 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -76,6 +76,8 @@ class VeloxRuntime final : public Runtime { void dumpConf(const std::string& path) override; + std::shared_ptr createArrowWriter(const std::string& path) override; + std::shared_ptr createDataSource(const std::string& filePath, std::shared_ptr schema); std::shared_ptr getVeloxPlan() { diff --git a/cpp/velox/benchmarks/common/FileReaderIterator.cc b/cpp/velox/operators/reader/FileReaderIterator.cc similarity index 55% rename from cpp/velox/benchmarks/common/FileReaderIterator.cc rename to cpp/velox/operators/reader/FileReaderIterator.cc index 26985c7f03c4..d732adbf33c0 100644 --- a/cpp/velox/benchmarks/common/FileReaderIterator.cc +++ b/cpp/velox/operators/reader/FileReaderIterator.cc @@ -15,33 +15,38 @@ * limitations under the License. */ -#include "FileReaderIterator.h" -#include "benchmarks/common/ParquetReaderIterator.h" -#ifdef GLUTEN_ENABLE_ORC -#include "benchmarks/common/OrcReaderIterator.h" -#endif +#include "operators/reader/FileReaderIterator.h" +#include +#include "operators/reader/ParquetReaderIterator.h" -std::shared_ptr gluten::getInputIteratorFromFileReader( +namespace gluten { +namespace { +const std::string kParquetSuffix = ".parquet"; +} + +FileReaderIterator::FileReaderIterator(const std::string& path) : path_(path) {} + +int64_t FileReaderIterator::getCollectBatchTime() const { + return collectBatchTime_; +} + +std::shared_ptr FileReaderIterator::getInputIteratorFromFileReader( + FileReaderType readerType, const std::string& path, - gluten::FileReaderType readerType) { + int64_t batchSize, + facebook::velox::memory::MemoryPool* pool) { std::filesystem::path input{path}; auto suffix = input.extension().string(); if (suffix == kParquetSuffix) { if (readerType == FileReaderType::kStream) { - return std::make_shared(std::make_unique(path)); - } - if (readerType == FileReaderType::kBuffered) { - return std::make_shared(std::make_unique(path)); - } - } else if (suffix == kOrcSuffix) { -#ifdef GLUTEN_ENABLE_ORC - if (readerType == FileReaderType::kStream) { - return std::make_shared(std::make_unique(path)); + return std::make_shared( + std::make_unique(path, batchSize, pool)); } if (readerType == FileReaderType::kBuffered) { - return std::make_shared(std::make_unique(path)); + return std::make_shared( + std::make_unique(path, batchSize, pool)); } -#endif } throw new GlutenException("Unreachable."); } +} // namespace gluten diff --git a/cpp/velox/benchmarks/common/FileReaderIterator.h b/cpp/velox/operators/reader/FileReaderIterator.h similarity index 69% rename from cpp/velox/benchmarks/common/FileReaderIterator.h rename to cpp/velox/operators/reader/FileReaderIterator.h index 16db58ce4569..e782c2bf80f4 100644 --- a/cpp/velox/benchmarks/common/FileReaderIterator.h +++ b/cpp/velox/operators/reader/FileReaderIterator.h @@ -14,43 +14,35 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#pragma once -#include -#include -#include +#pragma once -#include "BenchmarkUtils.h" #include "compute/ResultIterator.h" -#include "memory/ColumnarBatch.h" #include "memory/ColumnarBatchIterator.h" +#include "velox/common/memory/MemoryPool.h" namespace gluten { - -static const std::string kOrcSuffix = ".orc"; -static const std::string kParquetSuffix = ".parquet"; - enum FileReaderType { kBuffered, kStream, kNone }; class FileReaderIterator : public ColumnarBatchIterator { public: - explicit FileReaderIterator(const std::string& path) : path_(path) {} + static std::shared_ptr getInputIteratorFromFileReader( + FileReaderType readerType, + const std::string& path, + int64_t batchSize, + facebook::velox::memory::MemoryPool* pool); + + explicit FileReaderIterator(const std::string& path); virtual ~FileReaderIterator() = default; virtual std::shared_ptr getSchema() = 0; - int64_t getCollectBatchTime() const { - return collectBatchTime_; - } + int64_t getCollectBatchTime() const; protected: int64_t collectBatchTime_ = 0; std::string path_; }; -std::shared_ptr getInputIteratorFromFileReader( - const std::string& path, - FileReaderType readerType); - } // namespace gluten diff --git a/cpp/velox/operators/reader/ParquetReaderIterator.cc b/cpp/velox/operators/reader/ParquetReaderIterator.cc new file mode 100644 index 000000000000..3e61e1d8d936 --- /dev/null +++ b/cpp/velox/operators/reader/ParquetReaderIterator.cc @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "operators/reader/ParquetReaderIterator.h" +#include "memory/VeloxColumnarBatch.h" + +#include + +namespace gluten { + +ParquetReaderIterator::ParquetReaderIterator( + const std::string& path, + int64_t batchSize, + facebook::velox::memory::MemoryPool* pool) + : FileReaderIterator(path), batchSize_(batchSize), pool_(pool) {} + +void ParquetReaderIterator::createReader() { + parquet::ArrowReaderProperties properties = parquet::default_arrow_reader_properties(); + properties.set_batch_size(batchSize_); + GLUTEN_THROW_NOT_OK(parquet::arrow::FileReader::Make( + arrow::default_memory_pool(), parquet::ParquetFileReader::OpenFile(path_), properties, &fileReader_)); + GLUTEN_THROW_NOT_OK( + fileReader_->GetRecordBatchReader(arrow::internal::Iota(fileReader_->num_row_groups()), &recordBatchReader_)); + + auto schema = recordBatchReader_->schema(); + DLOG(INFO) << "Schema:\n" << schema->ToString(); +} + +std::shared_ptr ParquetReaderIterator::getSchema() { + return recordBatchReader_->schema(); +} + +ParquetStreamReaderIterator::ParquetStreamReaderIterator( + const std::string& path, + int64_t batchSize, + facebook::velox::memory::MemoryPool* pool) + : ParquetReaderIterator(path, batchSize, pool) { + createReader(); + DLOG(INFO) << "ParquetStreamReaderIterator open file: " << path; +} + +std::shared_ptr ParquetStreamReaderIterator::next() { + auto startTime = std::chrono::steady_clock::now(); + GLUTEN_ASSIGN_OR_THROW(auto batch, recordBatchReader_->Next()); + DLOG(INFO) << "ParquetStreamReaderIterator get a batch, num rows: " << (batch ? batch->num_rows() : 0); + collectBatchTime_ += + std::chrono::duration_cast(std::chrono::steady_clock::now() - startTime).count(); + if (batch == nullptr) { + return nullptr; + } + return VeloxColumnarBatch::from(pool_, std::make_shared(batch)); +} + +ParquetBufferedReaderIterator::ParquetBufferedReaderIterator( + const std::string& path, + int64_t batchSize, + facebook::velox::memory::MemoryPool* pool) + : ParquetReaderIterator(path, batchSize, pool) { + createReader(); + collectBatches(); + iter_ = batches_.begin(); + DLOG(INFO) << "ParquetBufferedReaderIterator open file: " << path; + DLOG(INFO) << "Number of input batches: " << std::to_string(batches_.size()); + if (iter_ != batches_.cend()) { + DLOG(INFO) << "columns: " << (*iter_)->num_columns(); + DLOG(INFO) << "rows: " << (*iter_)->num_rows(); + } +} + +std::shared_ptr ParquetBufferedReaderIterator::next() { + if (iter_ == batches_.cend()) { + return nullptr; + } + return VeloxColumnarBatch::from(pool_, std::make_shared(*iter_++)); +} + +void ParquetBufferedReaderIterator::collectBatches() { + auto startTime = std::chrono::steady_clock::now(); + GLUTEN_ASSIGN_OR_THROW(batches_, recordBatchReader_->ToRecordBatches()); + auto endTime = std::chrono::steady_clock::now(); + collectBatchTime_ += std::chrono::duration_cast(endTime - startTime).count(); +} +} // namespace gluten \ No newline at end of file diff --git a/cpp/velox/operators/reader/ParquetReaderIterator.h b/cpp/velox/operators/reader/ParquetReaderIterator.h new file mode 100644 index 000000000000..f45fe5eb7721 --- /dev/null +++ b/cpp/velox/operators/reader/ParquetReaderIterator.h @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "operators/reader/FileReaderIterator.h" + +#include +#include + +namespace gluten { + +class ParquetReaderIterator : public FileReaderIterator { + public: + explicit ParquetReaderIterator(const std::string& path, int64_t batchSize, facebook::velox::memory::MemoryPool* pool); + + void createReader(); + + std::shared_ptr getSchema() override; + + protected: + std::unique_ptr<::parquet::arrow::FileReader> fileReader_; + std::shared_ptr recordBatchReader_; + int64_t batchSize_; + facebook::velox::memory::MemoryPool* pool_; +}; + +class ParquetStreamReaderIterator final : public ParquetReaderIterator { + public: + ParquetStreamReaderIterator(const std::string& path, int64_t batchSize, facebook::velox::memory::MemoryPool* pool); + + std::shared_ptr next() override; +}; + +class ParquetBufferedReaderIterator final : public ParquetReaderIterator { + public: + explicit ParquetBufferedReaderIterator( + const std::string& path, + int64_t batchSize, + facebook::velox::memory::MemoryPool* pool); + + std::shared_ptr next() override; + + private: + void collectBatches(); + + arrow::RecordBatchVector batches_; + std::vector>::const_iterator iter_; +}; + +} // namespace gluten diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc index acb14cf4de39..9c5d166a07da 100644 --- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc +++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc @@ -17,10 +17,11 @@ #include "VeloxColumnarBatchSerializer.h" +#include + #include "memory/ArrowMemory.h" #include "memory/VeloxColumnarBatch.h" #include "velox/common/memory/Memory.h" -#include "velox/vector/ComplexVector.h" #include "velox/vector/FlatVector.h" #include "velox/vector/arrow/Bridge.h" diff --git a/cpp/velox/operators/writer/VeloxArrowWriter.cc b/cpp/velox/operators/writer/VeloxArrowWriter.cc new file mode 100644 index 000000000000..565602d95cc9 --- /dev/null +++ b/cpp/velox/operators/writer/VeloxArrowWriter.cc @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "operators/writer/VeloxArrowWriter.h" + +namespace gluten { + +VeloxArrowWriter::VeloxArrowWriter( + const std::string& path, + int64_t batchSize, + facebook::velox::memory::MemoryPool* pool) + : ArrowWriter(path), batchSize_(batchSize), pool_(pool) {} + +std::shared_ptr VeloxArrowWriter::retrieveColumnarBatch() { + if (writer_ == nullptr) { + // No data to read. + return nullptr; + } + if (reader_ == nullptr) { + reader_ = std::make_unique(path_, batchSize_, pool_); + } + return reader_->next(); +} +} // namespace gluten diff --git a/cpp/velox/operators/writer/VeloxArrowWriter.h b/cpp/velox/operators/writer/VeloxArrowWriter.h new file mode 100644 index 000000000000..8b7998628716 --- /dev/null +++ b/cpp/velox/operators/writer/VeloxArrowWriter.h @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "operators/reader/ParquetReaderIterator.h" +#include "operators/writer/ArrowWriter.h" + +namespace gluten { +class VeloxArrowWriter : public ArrowWriter { + public: + explicit VeloxArrowWriter(const std::string& path, int64_t batchSize, facebook::velox::memory::MemoryPool* pool); + + std::shared_ptr retrieveColumnarBatch() override; + + private: + int64_t batchSize_; + facebook::velox::memory::MemoryPool* pool_; + std::unique_ptr reader_{nullptr}; +}; +} // namespace gluten diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc index 4b5bcdf81919..b4944d92054b 100644 --- a/cpp/velox/tests/RuntimeTest.cc +++ b/cpp/velox/tests/RuntimeTest.cc @@ -103,6 +103,10 @@ class DummyRuntime final : public Runtime { throw GlutenException("Not yet implemented"); } + std::shared_ptr createArrowWriter(const std::string& path) override { + throw GlutenException("Not yet implemented"); + } + private: class DummyResultIterator : public ColumnarBatchIterator { public: diff --git a/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc b/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc index ffa6f032ac44..c00ab6a14844 100644 --- a/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc +++ b/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc @@ -19,12 +19,13 @@ #include "memory/ArrowMemoryPool.h" #include "memory/VeloxColumnarBatch.h" -#include "memory/VeloxMemoryManager.h" #include "operators/serializer/VeloxColumnarBatchSerializer.h" #include "utils/VeloxArrowUtils.h" #include "velox/vector/arrow/Bridge.h" #include "velox/vector/tests/utils/VectorTestBase.h" +#include + using namespace facebook::velox; namespace gluten { diff --git a/cpp/velox/utils/VeloxArrowUtils.cc b/cpp/velox/utils/VeloxArrowUtils.cc index 0349eb718b8e..f26b49a4768d 100644 --- a/cpp/velox/utils/VeloxArrowUtils.cc +++ b/cpp/velox/utils/VeloxArrowUtils.cc @@ -16,6 +16,9 @@ */ #include "utils/VeloxArrowUtils.h" + +#include + #include "memory/VeloxColumnarBatch.h" #include "utils/Common.h" #include "velox/vector/ComplexVector.h" diff --git a/docs/developers/MicroBenchmarks.md b/docs/developers/MicroBenchmarks.md index eedf5010b634..1483dc2cbaff 100644 --- a/docs/developers/MicroBenchmarks.md +++ b/docs/developers/MicroBenchmarks.md @@ -64,7 +64,7 @@ cd /path/to/gluten/cpp/build/velox/benchmarks --plan /home/sparkuser/github/apache/incubator-gluten/backends-velox/generated-native-benchmark/example.json \ --data /home/sparkuser/github/apache/incubator-gluten/backends-velox/generated-native-benchmark/example_orders/part-00000-1e66fb98-4dd6-47a6-8679-8625dbc437ee-c000.snappy.parquet,\ /home/sparkuser/github/apache/incubator-gluten/backends-velox/generated-native-benchmark/example_lineitem/part-00000-3ec19189-d20e-4240-85ae-88631d46b612-c000.snappy.parquet \ ---threads 1 --iterations 1 --noprint-result --benchmark_filter=InputFromBatchStream +--threads 1 --iterations 1 --noprint-result ``` The output should be like: @@ -118,12 +118,12 @@ cd /path/to/gluten/ First, get the Stage Id from spark UI for the stage you want to simulate. And then re-run the query with below configurations to dump the inputs to micro benchmark. -| Parameters | Description | Recommend Setting | -|---------------------------------------------|----------------------------------------------------------------------------------------------------------------|-----------------------| -| spark.gluten.sql.benchmark_task.stageId | Spark task stage id | target stage id | -| spark.gluten.sql.benchmark_task.partitionId | Spark task partition id, default value -1 means all the partition of this stage | 0 | -| spark.gluten.sql.benchmark_task.taskId | If not specify partition id, use spark task attempt id, default value -1 means all the partition of this stage | target task attemp id | -| spark.gluten.saveDir | Directory to save the inputs to micro benchmark, should exist and be empty. | /path/to/saveDir | +| Parameters | Description | Recommend Setting | +|---------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------| +| spark.gluten.sql.benchmark_task.taskId | Comma-separated string to specify the Task IDs to dump. If it's set, `spark.gluten.sql.benchmark_task.stageId` and `spark.gluten.sql.benchmark_task.partitionId` will be ignored. | Comma-separated string of task IDs. Empty by default. | +| spark.gluten.sql.benchmark_task.stageId | Spark stage ID. | Target stage ID | +| spark.gluten.sql.benchmark_task.partitionId | Comma-separated string to specify the Partition IDs in a stage to dump. Must be specified together with `spark.gluten.sql.benchmark_task.stageId`. Empty by default, meaning all partitions of this stage will be dumped. To identify the partition ID, navigate to the `Stage` tab in the Spark UI and locate it under the `Index` column. | Comma-separated string of partition IDs. Empty by default. | +| spark.gluten.saveDir | Directory to save the inputs to micro benchmark, should exist and be empty. | /path/to/saveDir | Check the files in `spark.gluten.saveDir`. If the simulated stage is a first stage, you will get 3 or 4 types of dumped file: diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala b/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala index dea0a4814028..f9bb7478e7d0 100644 --- a/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala +++ b/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala @@ -21,24 +21,32 @@ import org.apache.gluten.GlutenConfig import org.apache.spark.TaskContext object DebugUtil { - // if specify taskId, then only do that task partition - // if not specify stageId, then do nothing + // if taskId is specified and matches, then do that task + // if stageId is not specified or doesn't match, then do nothing // if specify stageId but no partitionId, then do all partitions for that stage // if specify stageId and partitionId, then only do that partition for that stage def saveInputToFile(): Boolean = { - if (TaskContext.get().taskAttemptId() == GlutenConfig.getConf.taskId) { - return true - } - if (TaskContext.get().stageId() != GlutenConfig.getConf.taskStageId) { - return false - } - if (GlutenConfig.getConf.taskPartitionId == -1) { - return true - } - if (TaskContext.getPartitionId() == GlutenConfig.getConf.taskPartitionId) { - return true - } + def taskIdMatches = + GlutenConfig.getConf.benchmarkTaskId.nonEmpty && + GlutenConfig.getConf.benchmarkTaskId + .split(",") + .map(_.toLong) + .contains(TaskContext.get().taskAttemptId()) + + def partitionIdMatches = + TaskContext.get().stageId() == GlutenConfig.getConf.benchmarkStageId && + (GlutenConfig.getConf.benchmarkPartitionId.isEmpty || + GlutenConfig.getConf.benchmarkPartitionId + .split(",") + .map(_.toInt) + .contains(TaskContext.get().partitionId())) - false + val saveInput = taskIdMatches || partitionIdMatches + if (saveInput) { + if (GlutenConfig.getConf.benchmarkSaveDir.isEmpty) { + throw new IllegalArgumentException(GlutenConfig.BENCHMARK_SAVE_DIR.key + " is not set.") + } + } + saveInput } } diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index f756eb20a6d3..e0d06ce6fc0f 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -423,9 +423,10 @@ class GlutenConfig(conf: SQLConf) extends Logging { def debug: Boolean = conf.getConf(DEBUG_ENABLED) def debugKeepJniWorkspace: Boolean = conf.getConf(DEBUG_KEEP_JNI_WORKSPACE) def collectUtStats: Boolean = conf.getConf(UT_STATISTIC) - def taskStageId: Int = conf.getConf(BENCHMARK_TASK_STAGEID) - def taskPartitionId: Int = conf.getConf(BENCHMARK_TASK_PARTITIONID) - def taskId: Long = conf.getConf(BENCHMARK_TASK_TASK_ID) + def benchmarkStageId: Int = conf.getConf(BENCHMARK_TASK_STAGEID) + def benchmarkPartitionId: String = conf.getConf(BENCHMARK_TASK_PARTITIONID) + def benchmarkTaskId: String = conf.getConf(BENCHMARK_TASK_TASK_ID) + def benchmarkSaveDir: String = conf.getConf(BENCHMARK_SAVE_DIR) def textInputMaxBlockSize: Long = conf.getConf(TEXT_INPUT_ROW_MAX_BLOCK_SIZE) def textIputEmptyAsDefault: Boolean = conf.getConf(TEXT_INPUT_EMPTY_AS_DEFAULT) def enableParquetRowGroupMaxMinIndex: Boolean = @@ -1719,14 +1720,20 @@ object GlutenConfig { val BENCHMARK_TASK_PARTITIONID = buildConf("spark.gluten.sql.benchmark_task.partitionId") .internal() - .intConf - .createWithDefault(-1) + .stringConf + .createWithDefault("") val BENCHMARK_TASK_TASK_ID = buildConf("spark.gluten.sql.benchmark_task.taskId") .internal() - .longConf - .createWithDefault(-1L) + .stringConf + .createWithDefault("") + + val BENCHMARK_SAVE_DIR = + buildConf(GLUTEN_SAVE_DIR) + .internal() + .stringConf + .createWithDefault("") val NATIVE_WRITER_ENABLED = buildConf("spark.gluten.sql.native.writer.enabled")