From 62490ac045b7657a38428d008de8e8bdeaa00f4e Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Fri, 13 Dec 2024 09:13:52 +0000 Subject: [PATCH] respect config kSpillReadBufferSize and add spill compression codec --- cpp/core/config/GlutenConfig.h | 1 + cpp/velox/compute/WholeStageResultIterator.cc | 12 +++++++--- cpp/velox/config/VeloxConfig.h | 8 +++++-- .../org/apache/gluten/GlutenConfig.scala | 24 +++++++++++-------- 4 files changed, 30 insertions(+), 15 deletions(-) diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h index 5a61b27a80b6..3207d3bec5dc 100644 --- a/cpp/core/config/GlutenConfig.h +++ b/cpp/core/config/GlutenConfig.h @@ -66,6 +66,7 @@ const std::string kUGITokens = "spark.gluten.ugi.tokens"; const std::string kShuffleCompressionCodec = "spark.gluten.sql.columnar.shuffle.codec"; const std::string kShuffleCompressionCodecBackend = "spark.gluten.sql.columnar.shuffle.codecBackend"; +const std::string kShuffleSpillDiskWriteBufferSize = "spark.shuffle.spill.diskWriteBufferSize"; const std::string kQatBackendName = "qat"; const std::string kIaaBackendName = "iaa"; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index ba67c182555a..1bf783fb8c64 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -516,17 +516,23 @@ std::unordered_map WholeStageResultIterator::getQueryC configs[velox::core::QueryConfig::kMaxSpillBytes] = std::to_string(veloxCfg_->get(kMaxSpillBytes, 107374182400LL)); configs[velox::core::QueryConfig::kSpillWriteBufferSize] = - std::to_string(veloxCfg_->get(kSpillWriteBufferSize, 4L * 1024 * 1024)); + std::to_string(veloxCfg_->get(kShuffleSpillDiskWriteBufferSize, 1L * 1024 * 1024)); + configs[velox::core::QueryConfig::kSpillReadBufferSize] = + std::to_string(veloxCfg_->get(kSpillReadBufferSize, 1L * 1024 * 1024)); configs[velox::core::QueryConfig::kSpillStartPartitionBit] = std::to_string(veloxCfg_->get(kSpillStartPartitionBit, 29)); configs[velox::core::QueryConfig::kSpillNumPartitionBits] = std::to_string(veloxCfg_->get(kSpillPartitionBits, 3)); configs[velox::core::QueryConfig::kSpillableReservationGrowthPct] = std::to_string(veloxCfg_->get(kSpillableReservationGrowthPct, 25)); - configs[velox::core::QueryConfig::kSpillCompressionKind] = - veloxCfg_->get(kSpillCompressionKind, "lz4"); configs[velox::core::QueryConfig::kSpillPrefixSortEnabled] = veloxCfg_->get(kSpillPrefixSortEnabled, "false"); + if (veloxCfg_->get(kSparkShuffleSpillCompress, true)) { + configs[velox::core::QueryConfig::kSpillCompressionKind] = + veloxCfg_->get(kSpillCompressionKind, veloxCfg_->get(kCompressionKind, "lz4")); + } else { + configs[velox::core::QueryConfig::kSpillCompressionKind] = "none"; + } configs[velox::core::QueryConfig::kSparkBloomFilterExpectedNumItems] = std::to_string(veloxCfg_->get(kBloomFilterExpectedNumItems, 1000000)); configs[velox::core::QueryConfig::kSparkBloomFilterNumBits] = diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index 4ae82f263d37..f882e72065b2 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -38,13 +38,17 @@ const std::string kSpillStartPartitionBit = "spark.gluten.sql.columnar.backend.v const std::string kSpillPartitionBits = "spark.gluten.sql.columnar.backend.velox.spillPartitionBits"; const std::string kMaxSpillRunRows = "spark.gluten.sql.columnar.backend.velox.MaxSpillRunRows"; const std::string kMaxSpillBytes = "spark.gluten.sql.columnar.backend.velox.MaxSpillBytes"; -const std::string kSpillWriteBufferSize = "spark.gluten.sql.columnar.backend.velox.spillWriteBufferSize"; +const std::string kSpillReadBufferSize = "spark.unsafe.sorter.spill.reader.buffer.size"; const uint64_t kMaxSpillFileSizeDefault = 1L * 1024 * 1024 * 1024; const std::string kSpillableReservationGrowthPct = "spark.gluten.sql.columnar.backend.velox.spillableReservationGrowthPct"; -const std::string kSpillCompressionKind = "spark.io.compression.codec"; const std::string kSpillPrefixSortEnabled = "spark.gluten.sql.columnar.backend.velox.spillPrefixsortEnabled"; +// Whether to compress data spilled. Compression will use spark.io.compression.codec or kSpillCompressionKind. +const std::string kSparkShuffleSpillCompress = "spark.shuffle.spill.compress"; +const std::string kCompressionKind = "spark.io.compression.codec"; +/// The compression codec to use for spilling. Use kCompressionKind if not set. +const std::string kSpillCompressionKind = "spark.gluten.sql.columnar.backend.velox.spillCompressionCodec"; const std::string kMaxPartialAggregationMemoryRatio = "spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio"; const std::string kMaxExtendedPartialAggregationMemoryRatio = diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 15704f1450ee..296153346ef8 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -321,8 +321,6 @@ class GlutenConfig(conf: SQLConf) extends Logging { def veloxMaxSpillBytes: Long = conf.getConf(COLUMNAR_VELOX_MAX_SPILL_BYTES) - def veloxMaxWriteBufferSize: Long = conf.getConf(COLUMNAR_VELOX_MAX_SPILL_WRITE_BUFFER_SIZE) - def veloxBloomFilterExpectedNumItems: Long = conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS) @@ -571,6 +569,12 @@ object GlutenConfig { val SPARK_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled" val SPARK_REDACTION_REGEX = "spark.redaction.regex" val SPARK_SHUFFLE_FILE_BUFFER = "spark.shuffle.file.buffer" + val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE = "spark.unsafe.sorter.spill.reader.buffer.size" + val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024 + val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE = "spark.shuffle.spill.diskWriteBufferSize" + val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024 + val SPARK_SHUFFLE_SPILL_COMPRESS = "spark.shuffle.spill.compress" + val SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT: Boolean = true // For Soft Affinity Scheduling // Enable Soft Affinity Scheduling, default value is false @@ -734,7 +738,14 @@ object GlutenConfig { COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.defaultValueString), ( GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.key, - GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString) + GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString), + ( + SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE, + SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT.toString), + ( + SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE, + SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT.toString), + (SPARK_SHUFFLE_SPILL_COMPRESS, SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT.toString) ) keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2))) @@ -1605,13 +1616,6 @@ object GlutenConfig { .bytesConf(ByteUnit.BYTE) .createWithDefaultString("100G") - val COLUMNAR_VELOX_MAX_SPILL_WRITE_BUFFER_SIZE = - buildConf("spark.gluten.sql.columnar.backend.velox.spillWriteBufferSize") - .internal() - .doc("The maximum write buffer size") - .bytesConf(ByteUnit.BYTE) - .createWithDefaultString("4M") - val MAX_PARTITION_PER_WRITERS_SESSION = buildConf("spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession") .internal()