From 3dceeb85cd8cf15a6a8630f74235033a507eea9e Mon Sep 17 00:00:00 2001 From: Zhichao Zhang Date: Tue, 8 Oct 2024 09:46:31 +0800 Subject: [PATCH] [GLUTEN-7394][CH]Reduce the times of the calling listFiles when executing query from the parquet file format (#7417) * [GLUTEN-7394][CH]Reduce the times of the calling listFiles when executing query from the mergetree file format Reduce the times of the calling listFiles when executing query from the mergetree file format Close #7394. --- .../backendsapi/clickhouse/CHBackend.scala | 22 +---------- .../clickhouse/CHIteratorApi.scala | 37 +++++++++++++++++-- .../backendsapi/velox/VeloxBackend.scala | 4 +- .../local-engine/Common/GlutenStringUtils.cpp | 34 ++--------------- .../local-engine/Common/GlutenStringUtils.h | 5 +-- .../Storages/SubstraitSource/FormatFile.cpp | 19 +++++++--- cpp-ch/local-engine/tests/gtest_utils.cpp | 31 ---------------- .../execution/IcebergScanTransformer.scala | 2 - .../backendsapi/BackendSettingsApi.scala | 8 ++-- .../gluten/execution/BaseDataSource.scala | 3 -- .../execution/BasicScanExecTransformer.scala | 17 +-------- .../execution/BatchScanExecTransformer.scala | 7 ---- .../FileSourceScanExecTransformer.scala | 4 -- .../hive/HiveTableScanExecTransformer.scala | 5 --- 14 files changed, 59 insertions(+), 139 deletions(-) delete mode 100644 cpp-ch/local-engine/tests/gtest_utils.cpp diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 725e90e2adfb..59d912d8e75d 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -145,18 +145,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { override def validateScan( format: ReadFileFormat, fields: Array[StructField], - partTable: Boolean, - rootPaths: Seq[String], - paths: Seq[String]): ValidationResult = { - - def validateFilePath: Boolean = { - // Fallback to vanilla spark when the input path - // does not contain the partition info. - if (partTable && !paths.forall(_.contains("="))) { - return false - } - true - } + rootPaths: Seq[String]): ValidationResult = { // Validate if all types are supported. def hasComplexType: Boolean = { @@ -176,12 +165,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { !unsupportedDataTypes.isEmpty } format match { - case ParquetReadFormat => - if (validateFilePath) { - ValidationResult.succeeded - } else { - ValidationResult.failed("Validate file path failed.") - } + case ParquetReadFormat => ValidationResult.succeeded case OrcReadFormat => ValidationResult.succeeded case MergeTreeReadFormat => ValidationResult.succeeded case TextReadFormat => @@ -343,8 +327,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging { override def transformCheckOverflow: Boolean = false - override def requiredInputFilePaths(): Boolean = true - override def requireBloomFilterAggMightContainJointFallback(): Boolean = false def maxShuffleReadRows(): Long = { diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 0a3dbc3f5a37..3c834b7ca847 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -34,16 +34,20 @@ import org.apache.spark.affinity.CHAffinity import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging import org.apache.spark.shuffle.CHColumnarShuffleWriter +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder, ExtensionTableNode} import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch import java.lang.{Long => JLong} import java.net.URI +import java.nio.charset.StandardCharsets +import java.time.ZoneOffset import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} import scala.collection.JavaConverters._ @@ -156,14 +160,41 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { val fileSizes = new JArrayList[JLong]() val modificationTimes = new JArrayList[JLong]() val partitionColumns = new JArrayList[JMap[String, String]] + val metadataColumns = new JArrayList[JMap[String, String]] f.files.foreach { file => paths.add(new URI(file.filePath.toString()).toASCIIString) starts.add(JLong.valueOf(file.start)) lengths.add(JLong.valueOf(file.length)) - // TODO: Support custom partition location + val metadataColumn = + SparkShimLoader.getSparkShims.generateMetadataColumns(file, metadataColumnNames) + metadataColumns.add(metadataColumn) val partitionColumn = new JHashMap[String, String]() + for (i <- 0 until file.partitionValues.numFields) { + val partitionColumnValue = if (file.partitionValues.isNullAt(i)) { + ExternalCatalogUtils.DEFAULT_PARTITION_NAME + } else { + val pn = file.partitionValues.get(i, partitionSchema.fields(i).dataType) + partitionSchema.fields(i).dataType match { + case _: BinaryType => + new String(pn.asInstanceOf[Array[Byte]], StandardCharsets.UTF_8) + case _: DateType => + DateFormatter.apply().format(pn.asInstanceOf[Integer]) + case _: DecimalType => + pn.asInstanceOf[Decimal].toJavaBigInteger.toString + case _: TimestampType => + TimestampFormatter + .getFractionFormatter(ZoneOffset.UTC) + .format(pn.asInstanceOf[java.lang.Long]) + case _ => pn.toString + } + } + partitionColumn.put( + ConverterUtils.normalizeColName(partitionSchema.names(i)), + partitionColumnValue) + } partitionColumns.add(partitionColumn) + val (fileSize, modificationTime) = SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(file) (fileSize, modificationTime) match { @@ -185,7 +216,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { fileSizes, modificationTimes, partitionColumns, - new JArrayList[JMap[String, String]](), + metadataColumns, fileFormat, preferredLocations.toList.asJava, mapAsJavaMap(properties) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 56dc92f420a3..939fc7f04fd6 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -93,9 +93,7 @@ object VeloxBackendSettings extends BackendSettingsApi { override def validateScan( format: ReadFileFormat, fields: Array[StructField], - partTable: Boolean, - rootPaths: Seq[String], - paths: Seq[String]): ValidationResult = { + rootPaths: Seq[String]): ValidationResult = { val filteredRootPaths = distinctRootPaths(rootPaths) if ( filteredRootPaths.nonEmpty && !VeloxFileSystemValidationJniWrapper diff --git a/cpp-ch/local-engine/Common/GlutenStringUtils.cpp b/cpp-ch/local-engine/Common/GlutenStringUtils.cpp index 4a18f4ceda02..858099fff920 100644 --- a/cpp-ch/local-engine/Common/GlutenStringUtils.cpp +++ b/cpp-ch/local-engine/Common/GlutenStringUtils.cpp @@ -22,48 +22,20 @@ namespace local_engine { -PartitionValues GlutenStringUtils::parsePartitionTablePath(const std::string & file) -{ - PartitionValues result; - Poco::StringTokenizer path(file, "/"); - for (const auto & item : path) - { - auto pos = item.find('='); - if (pos != std::string::npos) - { - auto key = boost::to_lower_copy(item.substr(0, pos)); - auto value = item.substr(pos + 1); - - std::string unescaped_key; - std::string unescaped_value; - Poco::URI::decode(key, unescaped_key); - Poco::URI::decode(value, unescaped_value); - result.emplace_back(std::move(unescaped_key), std::move(unescaped_value)); - } - } - return result; -} bool GlutenStringUtils::isNullPartitionValue(const std::string & value) { return value == "__HIVE_DEFAULT_PARTITION__"; } -std::string GlutenStringUtils::dumpPartitionValue(const PartitionValue & value) -{ - return value.first + "=" + value.second; -} - -std::string GlutenStringUtils::dumpPartitionValues(const PartitionValues & values) +std::string GlutenStringUtils::dumpPartitionValues(const std::map & values) { std::string res; res += "["; - for (size_t i = 0; i < values.size(); ++i) + for (const auto & [key, value] : values) { - if (i) - res += ", "; - res += dumpPartitionValue(values[i]); + res += key + "=" + value + ", "; } res += "]"; diff --git a/cpp-ch/local-engine/Common/GlutenStringUtils.h b/cpp-ch/local-engine/Common/GlutenStringUtils.h index dd044135320f..0d980f228f18 100644 --- a/cpp-ch/local-engine/Common/GlutenStringUtils.h +++ b/cpp-ch/local-engine/Common/GlutenStringUtils.h @@ -17,6 +17,7 @@ #pragma once #include #include +#include namespace local_engine { @@ -26,10 +27,8 @@ using PartitionValues = std::vector; class GlutenStringUtils { public: - static PartitionValues parsePartitionTablePath(const std::string & file); static bool isNullPartitionValue(const std::string & value); - static std::string dumpPartitionValue(const PartitionValue & value); - static std::string dumpPartitionValues(const PartitionValues & values); + static std::string dumpPartitionValues(const std::map & values); }; } diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp index 1097abe6e698..fc5acc533d59 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp @@ -51,12 +51,19 @@ FormatFile::FormatFile( const ReadBufferBuilderPtr & read_buffer_builder_) : context(context_), file_info(file_info_), read_buffer_builder(read_buffer_builder_) { - PartitionValues part_vals = GlutenStringUtils::parsePartitionTablePath(file_info.uri_file()); - for (size_t i = 0; i < part_vals.size(); ++i) + if (file_info.partition_columns_size()) { - const auto & part = part_vals[i]; - partition_keys.push_back(part.first); - partition_values[part.first] = part.second; + for (size_t i = 0; i < file_info.partition_columns_size(); ++i) + { + const auto & partition_column = file_info.partition_columns(i); + std::string unescaped_key; + std::string unescaped_value; + Poco::URI::decode(partition_column.key(), unescaped_key); + Poco::URI::decode(partition_column.value(), unescaped_value); + auto key = std::move(unescaped_key); + partition_keys.push_back(key); + partition_values[key] = std::move(unescaped_value); + } } LOG_INFO( @@ -66,7 +73,7 @@ FormatFile::FormatFile( file_info.file_format_case(), std::to_string(file_info.start()) + "-" + std::to_string(file_info.start() + file_info.length()), file_info.partition_index(), - GlutenStringUtils::dumpPartitionValues(part_vals)); + GlutenStringUtils::dumpPartitionValues(partition_values)); } FormatFilePtr FormatFileUtil::createFile( diff --git a/cpp-ch/local-engine/tests/gtest_utils.cpp b/cpp-ch/local-engine/tests/gtest_utils.cpp deleted file mode 100644 index 4ea713921f6a..000000000000 --- a/cpp-ch/local-engine/tests/gtest_utils.cpp +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include -#include - -using namespace local_engine; - -TEST(TestStringUtils, TestExtractPartitionValues) -{ - std::string path = "/tmp/col1=1/col2=test/a.parquet"; - auto values = GlutenStringUtils::parsePartitionTablePath(path); - ASSERT_EQ(2, values.size()); - ASSERT_EQ("col1", values[0].first); - ASSERT_EQ("1", values[0].second); - ASSERT_EQ("col2", values[1].first); - ASSERT_EQ("test", values[1].second); -} diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala index 60f8a60064c6..1cbeb52a9213 100644 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala @@ -53,8 +53,6 @@ case class IcebergScanTransformer( override def getDataSchema: StructType = new StructType() - override def getInputFilePathsInternal: Seq[String] = Seq.empty - // TODO: get root paths from table. override def getRootPathsInternal: Seq[String] = Seq.empty diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index 7d07431a87d4..f1f46dd87e17 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -33,14 +33,14 @@ trait BackendSettingsApi { def validateScan( format: ReadFileFormat, fields: Array[StructField], - partTable: Boolean, - rootPaths: Seq[String], - paths: Seq[String]): ValidationResult = ValidationResult.succeeded + rootPaths: Seq[String]): ValidationResult = ValidationResult.succeeded + def supportWriteFilesExec( format: FileFormat, fields: Array[StructField], bucketSpec: Option[BucketSpec], options: Map[String, String]): ValidationResult = ValidationResult.succeeded + def supportNativeWrite(fields: Array[StructField]): Boolean = true def supportNativeMetadataColumns(): Boolean = false def supportNativeRowIndexColumn(): Boolean = false @@ -112,8 +112,6 @@ trait BackendSettingsApi { def staticPartitionWriteOnly(): Boolean = false - def requiredInputFilePaths(): Boolean = false - // TODO: Move this to test settings as used in UT only. def requireBloomFilterAggMightContainJointFallback(): Boolean = true diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala index 1a0ff3f84567..e0621a20de9a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala @@ -30,8 +30,5 @@ trait BaseDataSource { /** Returns the partitions generated by this data source scan. */ def getPartitions: Seq[InputPartition] - /** Returns the input file paths, used to validate the partition column path */ - def getInputFilePathsInternal: Seq[String] - def getRootPathsInternal: Seq[String] } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index 419c22d6c326..912b93079f4a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -50,16 +50,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource /** This can be used to report FileFormat for a file based scan operator. */ val fileFormat: ReadFileFormat - // TODO: Remove this expensive call when CH support scan custom partition location. - def getInputFilePaths: Seq[String] = { - // This is a heavy operation, and only the required backend executes the corresponding logic. - if (BackendsApiManager.getSettings.requiredInputFilePaths()) { - getInputFilePathsInternal - } else { - Seq.empty - } - } - def getRootFilePaths: Seq[String] = { if (GlutenConfig.getConf.scanFileSchemeValidationEnabled) { getRootPathsInternal @@ -101,12 +91,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource } val validationResult = BackendsApiManager.getSettings - .validateScan( - fileFormat, - fields, - getPartitionSchema.nonEmpty, - getRootFilePaths, - getInputFilePaths) + .validateScan(fileFormat, fields, getRootFilePaths) if (!validationResult.ok()) { return validationResult } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala index 553c7c4e0e7a..e1a1be8e29b5 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala @@ -125,13 +125,6 @@ abstract class BatchScanExecTransformerBase( case _ => new StructType() } - override def getInputFilePathsInternal: Seq[String] = { - scan match { - case fileScan: FileScan => fileScan.fileIndex.inputFiles.toSeq - case _ => Seq.empty - } - } - override def getRootPathsInternal: Seq[String] = { scan match { case fileScan: FileScan => diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala index af49cfd1ba02..d64c5ae016c5 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala @@ -123,10 +123,6 @@ abstract class FileSourceScanExecTransformerBase( override def getDataSchema: StructType = relation.dataSchema - override def getInputFilePathsInternal: Seq[String] = { - relation.location.inputFiles.toSeq - } - override def getRootPathsInternal: Seq[String] = { FileIndexUtil.getRootPath(relation.location) } diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala index 938bac2b1b2c..85432350d4a2 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala @@ -71,11 +71,6 @@ case class HiveTableScanExecTransformer( override def getDataSchema: StructType = relation.tableMeta.dataSchema - override def getInputFilePathsInternal: Seq[String] = { - // FIXME how does a hive table expose file paths? - Seq.empty - } - // TODO: get root paths from hive table. override def getRootPathsInternal: Seq[String] = Seq.empty