diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala index 2f55510a7b1f..3736f0f14415 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala @@ -1025,6 +1025,7 @@ class GlutenClickHouseDeltaParquetWriteSuite } } + // FIXME: optimize testSparkVersionLE33("test parquet optimize with the path based table") { val dataPath = s"$basePath/lineitem_delta_parquet_optimize_path_based" clearDataPath(dataPath) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala index 16ed302a02f4..2d43d6694e93 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala @@ -19,7 +19,6 @@ package org.apache.gluten.execution.hive import org.apache.gluten.GlutenConfig import org.apache.gluten.execution.GlutenClickHouseWholeStageTransformerSuite import org.apache.gluten.test.AllDataTypesWithComplexType.genTestData -import org.apache.gluten.utils.UTSystemParameters import org.apache.spark.SparkConf import org.apache.spark.gluten.NativeWriteChecker @@ -59,7 +58,6 @@ class GlutenClickHouseNativeWriteTableSuite .set("spark.databricks.delta.stalenessLimit", "3600000") .set("spark.gluten.sql.columnar.columnartorow", "true") .set(ClickHouseConfig.CLICKHOUSE_WORKER_ID, "1") - .set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath) .set("spark.gluten.sql.columnar.iterator", "true") .set("spark.gluten.sql.columnar.hashagg.enablefinal", "true") .set("spark.gluten.sql.enable.native.validation", "false") diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala index 44bd3a78f8bc..60ca58d9fc29 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala @@ -57,6 +57,7 @@ class GlutenClickHouseMergeTreeWriteSuite .set("spark.sql.adaptive.enabled", "true") .set("spark.sql.files.maxPartitionBytes", "20000000") .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true") + .set(CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString) .setCHSettings("min_insert_block_size_rows", 100000) .setCHSettings("mergetree.merge_after_insert", false) .setCHSettings("input_format_parquet_max_block_size", 8192) @@ -67,178 +68,172 @@ class GlutenClickHouseMergeTreeWriteSuite } test("test mergetree table write") { - withSQLConf((CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)) { - spark.sql(s""" - |DROP TABLE IF EXISTS lineitem_mergetree; - |""".stripMargin) + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree; + |""".stripMargin) - // write.format.default = mergetree - spark.sql(s""" - |CREATE TABLE IF NOT EXISTS lineitem_mergetree - |( - | l_orderkey bigint, - | l_partkey bigint, - | l_suppkey bigint, - | l_linenumber bigint, - | l_quantity double, - | l_extendedprice double, - | l_discount double, - | l_tax double, - | l_returnflag string, - | l_linestatus string, - | l_shipdate date, - | l_commitdate date, - | l_receiptdate date, - | l_shipinstruct string, - | l_shipmode string, - | l_comment string - |) - |USING clickhouse - |TBLPROPERTIES (write.format.default = 'mergetree') - |LOCATION '$basePath/lineitem_mergetree' - |""".stripMargin) + // write.format.default = mergetree + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |TBLPROPERTIES (write.format.default = 'mergetree') + |LOCATION '$basePath/lineitem_mergetree' + |""".stripMargin) - spark.sql(s""" - | insert into table lineitem_mergetree - | select * from lineitem - |""".stripMargin) + spark.sql(s""" + | insert into table lineitem_mergetree + | select * from lineitem + |""".stripMargin) - runTPCHQueryBySQL(1, q1("lineitem_mergetree")) { - df => - val plans = collect(df.queryExecution.executedPlan) { - case f: FileSourceScanExecTransformer => f - case w: WholeStageTransformer => w - } - assertResult(4)(plans.size) + runTPCHQueryBySQL(1, q1("lineitem_mergetree")) { + df => + val plans = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + case w: WholeStageTransformer => w + } + assertResult(4)(plans.size) - val mergetreeScan = plans(3).asInstanceOf[FileSourceScanExecTransformer] - assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree")) + val mergetreeScan = plans(3).asInstanceOf[FileSourceScanExecTransformer] + assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree")) - val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) - val addFiles = - fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assertResult(6)(addFiles.size) - assertResult(600572)(addFiles.map(_.rows).sum) + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) + val addFiles = + fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + assertResult(6)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) - // GLUTEN-5060: check the unnecessary FilterExec - val wholeStageTransformer = plans(2).asInstanceOf[WholeStageTransformer] - val planNodeJson = wholeStageTransformer.substraitPlanJson - assert( - !planNodeJson - .replaceAll("\n", "") - .replaceAll(" ", "") - .contains("\"input\":{\"filter\":{")) - } + // GLUTEN-5060: check the unnecessary FilterExec + val wholeStageTransformer = plans(2).asInstanceOf[WholeStageTransformer] + val planNodeJson = wholeStageTransformer.substraitPlanJson + assert( + !planNodeJson + .replaceAll("\n", "") + .replaceAll(" ", "") + .contains("\"input\":{\"filter\":{")) } } test("test mergetree insert overwrite") { - withSQLConf((CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)) { - spark.sql(s""" - |DROP TABLE IF EXISTS lineitem_mergetree_insertoverwrite; - |""".stripMargin) + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_insertoverwrite; + |""".stripMargin) - spark.sql(s""" - |CREATE TABLE IF NOT EXISTS lineitem_mergetree_insertoverwrite - |( - | l_orderkey bigint, - | l_partkey bigint, - | l_suppkey bigint, - | l_linenumber bigint, - | l_quantity double, - | l_extendedprice double, - | l_discount double, - | l_tax double, - | l_returnflag string, - | l_linestatus string, - | l_shipdate date, - | l_commitdate date, - | l_receiptdate date, - | l_shipinstruct string, - | l_shipmode string, - | l_comment string - |) - |USING clickhouse - |LOCATION '$basePath/lineitem_mergetree_insertoverwrite' - |""".stripMargin) + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_insertoverwrite + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |LOCATION '$basePath/lineitem_mergetree_insertoverwrite' + |""".stripMargin) - spark.sql(s""" - | insert into table lineitem_mergetree_insertoverwrite - | select * from lineitem - |""".stripMargin) + spark.sql(s""" + | insert into table lineitem_mergetree_insertoverwrite + | select * from lineitem + |""".stripMargin) - spark.sql(s""" - | insert overwrite table lineitem_mergetree_insertoverwrite - | select * from lineitem where mod(l_orderkey,2) = 1 - |""".stripMargin) - val sql2 = - s""" - | select count(*) from lineitem_mergetree_insertoverwrite - | - |""".stripMargin - assertResult(300001)( - // total rows should remain unchanged - spark.sql(sql2).collect().apply(0).get(0) - ) - } + spark.sql(s""" + | insert overwrite table lineitem_mergetree_insertoverwrite + | select * from lineitem where mod(l_orderkey,2) = 1 + |""".stripMargin) + val sql2 = + s""" + | select count(*) from lineitem_mergetree_insertoverwrite + | + |""".stripMargin + assertResult(300001)( + // total rows should remain unchanged + spark.sql(sql2).collect().apply(0).get(0) + ) } test("test mergetree insert overwrite partitioned table with small table, static") { - withSQLConf((CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)) { - spark.sql(s""" - |DROP TABLE IF EXISTS lineitem_mergetree_insertoverwrite2; - |""".stripMargin) + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_insertoverwrite2; + |""".stripMargin) - spark.sql(s""" - |CREATE TABLE IF NOT EXISTS lineitem_mergetree_insertoverwrite2 - |( - | l_orderkey bigint, - | l_partkey bigint, - | l_suppkey bigint, - | l_linenumber bigint, - | l_quantity double, - | l_extendedprice double, - | l_discount double, - | l_tax double, - | l_returnflag string, - | l_linestatus string, - | l_shipdate date, - | l_commitdate date, - | l_receiptdate date, - | l_shipinstruct string, - | l_shipmode string, - | l_comment string - |) - |USING clickhouse - |PARTITIONED BY (l_shipdate) - |LOCATION '$basePath/lineitem_mergetree_insertoverwrite2' - |""".stripMargin) + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_insertoverwrite2 + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |PARTITIONED BY (l_shipdate) + |LOCATION '$basePath/lineitem_mergetree_insertoverwrite2' + |""".stripMargin) - spark.sql(s""" - | insert into table lineitem_mergetree_insertoverwrite2 - | select * from lineitem - |""".stripMargin) + spark.sql(s""" + | insert into table lineitem_mergetree_insertoverwrite2 + | select * from lineitem + |""".stripMargin) - spark.sql( - s""" - | insert overwrite table lineitem_mergetree_insertoverwrite2 - | select * from lineitem where l_shipdate BETWEEN date'1993-02-01' AND date'1993-02-10' - |""".stripMargin) - val sql2 = - s""" - | select count(*) from lineitem_mergetree_insertoverwrite2 - | - |""".stripMargin - assertResult(2418)( - // total rows should remain unchanged - spark.sql(sql2).collect().apply(0).get(0) - ) - } + spark.sql( + s""" + | insert overwrite table lineitem_mergetree_insertoverwrite2 + | select * from lineitem where l_shipdate BETWEEN date'1993-02-01' AND date'1993-02-10' + |""".stripMargin) + val sql2 = + s""" + | select count(*) from lineitem_mergetree_insertoverwrite2 + | + |""".stripMargin + assertResult(2418)( + // total rows should remain unchanged + spark.sql(sql2).collect().apply(0).get(0) + ) } test("test mergetree insert overwrite partitioned table with small table, dynamic") { @@ -583,166 +578,164 @@ class GlutenClickHouseMergeTreeWriteSuite } test("test mergetree write with partition") { - withSQLConf((CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, "false")) { - spark.sql(s""" - |DROP TABLE IF EXISTS lineitem_mergetree_partition; - |""".stripMargin) - - spark.sql(s""" - |CREATE TABLE IF NOT EXISTS lineitem_mergetree_partition - |( - | l_orderkey bigint, - | l_partkey bigint, - | l_suppkey bigint, - | l_linenumber bigint, - | l_quantity double, - | l_extendedprice double, - | l_discount double, - | l_tax double, - | l_returnflag string, - | l_linestatus string, - | l_shipdate date, - | l_commitdate date, - | l_receiptdate date, - | l_shipinstruct string, - | l_shipmode string, - | l_comment string - |) - |USING clickhouse - |PARTITIONED BY (l_shipdate, l_returnflag) - |TBLPROPERTIES (orderByKey='l_orderkey', - | primaryKey='l_orderkey') - |LOCATION '$basePath/lineitem_mergetree_partition' - |""".stripMargin) - - // dynamic partitions - spark.sql(s""" - | insert into table lineitem_mergetree_partition - | select * from lineitem - |""".stripMargin) + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_partition; + |""".stripMargin) - // write with dataframe api - val source = spark.sql(s""" - |select - | l_orderkey , - | l_partkey , - | l_suppkey , - | l_linenumber , - | l_quantity , - | l_extendedprice , - | l_discount , - | l_tax , - | l_returnflag , - | l_linestatus , - | l_shipdate , - | l_commitdate , - | l_receiptdate , - | l_shipinstruct , - | l_shipmode , - | l_comment - | from lineitem - | where l_shipdate BETWEEN date'1993-01-01' AND date'1993-01-10' - |""".stripMargin) - - source.write - .format("clickhouse") - .mode(SaveMode.Append) - .insertInto("lineitem_mergetree_partition") - - // static partition - spark.sql(s""" - | insert into lineitem_mergetree_partition - | PARTITION (l_shipdate=date'1995-01-21', l_returnflag = 'A') - | (l_orderkey, - | l_partkey, - | l_suppkey, - | l_linenumber, - | l_quantity, - | l_extendedprice, - | l_discount, - | l_tax, - | l_linestatus, - | l_commitdate, - | l_receiptdate, - | l_shipinstruct, - | l_shipmode, - | l_comment) - | select l_orderkey, - | l_partkey, - | l_suppkey, - | l_linenumber, - | l_quantity, - | l_extendedprice, - | l_discount, - | l_tax, - | l_linestatus, - | l_commitdate, - | l_receiptdate, - | l_shipinstruct, - | l_shipmode, - | l_comment from lineitem - | where l_shipdate BETWEEN date'1993-02-01' AND date'1993-02-10' - |""".stripMargin) + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_partition + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |PARTITIONED BY (l_shipdate, l_returnflag) + |TBLPROPERTIES (orderByKey='l_orderkey', + | primaryKey='l_orderkey') + |LOCATION '$basePath/lineitem_mergetree_partition' + |""".stripMargin) - runTPCHQueryBySQL(1, q1("lineitem_mergetree_partition"), compareResult = false) { - df => - val result = df.collect() - assertResult(4)(result.length) - assertResult("A")(result(0).getString(0)) - assertResult("F")(result(0).getString(1)) - assertResult(3865234.0)(result(0).getDouble(2)) + // dynamic partitions + spark.sql(s""" + | insert into table lineitem_mergetree_partition + | select * from lineitem + |""".stripMargin) - assertResult("N")(result(2).getString(0)) - assertResult("O")(result(2).getString(1)) - assertResult(7454519.0)(result(2).getDouble(2)) + // write with dataframe api + val source = spark.sql(s""" + |select + | l_orderkey , + | l_partkey , + | l_suppkey , + | l_linenumber , + | l_quantity , + | l_extendedprice , + | l_discount , + | l_tax , + | l_returnflag , + | l_linestatus , + | l_shipdate , + | l_commitdate , + | l_receiptdate , + | l_shipinstruct , + | l_shipmode , + | l_comment + | from lineitem + | where l_shipdate BETWEEN date'1993-01-01' AND date'1993-01-10' + |""".stripMargin) + + source.write + .format("clickhouse") + .mode(SaveMode.Append) + .insertInto("lineitem_mergetree_partition") + + // static partition + spark.sql(s""" + | insert into lineitem_mergetree_partition + | PARTITION (l_shipdate=date'1995-01-21', l_returnflag = 'A') + | (l_orderkey, + | l_partkey, + | l_suppkey, + | l_linenumber, + | l_quantity, + | l_extendedprice, + | l_discount, + | l_tax, + | l_linestatus, + | l_commitdate, + | l_receiptdate, + | l_shipinstruct, + | l_shipmode, + | l_comment) + | select l_orderkey, + | l_partkey, + | l_suppkey, + | l_linenumber, + | l_quantity, + | l_extendedprice, + | l_discount, + | l_tax, + | l_linestatus, + | l_commitdate, + | l_receiptdate, + | l_shipinstruct, + | l_shipmode, + | l_comment from lineitem + | where l_shipdate BETWEEN date'1993-02-01' AND date'1993-02-10' + |""".stripMargin) + + runTPCHQueryBySQL(1, q1("lineitem_mergetree_partition"), compareResult = false) { + df => + val result = df.collect() + assertResult(4)(result.length) + assertResult("A")(result(0).getString(0)) + assertResult("F")(result(0).getString(1)) + assertResult(3865234.0)(result(0).getDouble(2)) - val scanExec = collect(df.queryExecution.executedPlan) { - case f: FileSourceScanExecTransformer => f - } - assertResult(1)(scanExec.size) + assertResult("N")(result(2).getString(0)) + assertResult("O")(result(2).getString(1)) + assertResult(7454519.0)(result(2).getDouble(2)) - val mergetreeScan = scanExec.head - assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree")) - assertResult(3745)(mergetreeScan.metrics("numFiles").value) + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assertResult(1)(scanExec.size) - val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) - assertResult("l_orderkey")( - ClickHouseTableV2 - .getTable(fileIndex.deltaLog) - .orderByKeyOption - .get - .mkString(",")) - assertResult("l_orderkey")( - ClickHouseTableV2 - .getTable(fileIndex.deltaLog) - .primaryKeyOption - .get - .mkString(",")) - assertResult(2)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size) - assertResult("l_shipdate")( - ClickHouseTableV2 - .getTable(fileIndex.deltaLog) - .partitionColumns - .head) - assertResult("l_returnflag")( - ClickHouseTableV2 - .getTable(fileIndex.deltaLog) - .partitionColumns(1)) - val addFiles = - fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + val mergetreeScan = scanExec.head + assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree")) + assertResult(3745)(mergetreeScan.metrics("numFiles").value) - assertResult(3836)(addFiles.size) - assertResult(605363)(addFiles.map(_.rows).sum) - assertResult(2)(addFiles.count(_.partitionValues("l_shipdate").equals("1992-06-01"))) - assertResult(4)(addFiles.count(_.partitionValues("l_shipdate").equals("1993-01-01"))) - assertResult(3)(addFiles.count(_.partitionValues("l_shipdate").equals("1995-01-21"))) - } + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assertResult("l_orderkey")( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .orderByKeyOption + .get + .mkString(",")) + assertResult("l_orderkey")( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .primaryKeyOption + .get + .mkString(",")) + assertResult(2)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size) + assertResult("l_shipdate")( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .partitionColumns + .head) + assertResult("l_returnflag")( + ClickHouseTableV2 + .getTable(fileIndex.deltaLog) + .partitionColumns(1)) + val addFiles = + fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + + assertResult(3836)(addFiles.size) + assertResult(605363)(addFiles.map(_.rows).sum) + assertResult(2)(addFiles.count(_.partitionValues("l_shipdate").equals("1992-06-01"))) + assertResult(4)(addFiles.count(_.partitionValues("l_shipdate").equals("1993-01-01"))) + assertResult(3)(addFiles.count(_.partitionValues("l_shipdate").equals("1995-01-21"))) } } - test("test mergetree write with bucket table") { + testSparkVersionLE33("test mergetree write with bucket table") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_bucket; |""".stripMargin) @@ -982,7 +975,7 @@ class GlutenClickHouseMergeTreeWriteSuite } } - test("test mergetree CTAS complex") { + test("test mergetree CTAS partition") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_ctas2; |""".stripMargin) @@ -991,8 +984,6 @@ class GlutenClickHouseMergeTreeWriteSuite |CREATE TABLE IF NOT EXISTS lineitem_mergetree_ctas2 |USING clickhouse |PARTITIONED BY (l_shipdate) - |CLUSTERED BY (l_orderkey) - |${if (spark32) "" else "SORTED BY (l_partkey, l_returnflag)"} INTO 4 BUCKETS |LOCATION '$basePath/lineitem_mergetree_ctas2' | as select * from lineitem |""".stripMargin) @@ -1630,7 +1621,7 @@ class GlutenClickHouseMergeTreeWriteSuite } } - test("test mergetree with primary keys filter pruning by driver with bucket") { + testSparkVersionLE33("test mergetree with primary keys filter pruning by driver with bucket") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_pk_pruning_by_driver_bucket; |""".stripMargin) @@ -1802,7 +1793,7 @@ class GlutenClickHouseMergeTreeWriteSuite } } - test("test mergetree with column case sensitive") { + testSparkVersionLE33("test mergetree with column case sensitive") { spark.sql(s""" |DROP TABLE IF EXISTS LINEITEM_MERGETREE_CASE_SENSITIVE; |""".stripMargin) @@ -1841,7 +1832,7 @@ class GlutenClickHouseMergeTreeWriteSuite runTPCHQueryBySQL(6, q6("lineitem_mergetree_case_sensitive")) { _ => } } - test("test mergetree with partition with whitespace") { + testSparkVersionLE33("test mergetree with partition with whitespace") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_partition_with_whitespace; |""".stripMargin)