diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala index 6a473cc54f7e..87e95cbe9dda 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala @@ -188,20 +188,33 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite var metadataGlutenExist: Boolean = false var metadataBinExist: Boolean = false var dataBinExist: Boolean = false + var hasCommits = false client .listObjects(args) .forEach( obj => { objectCount += 1 - if (obj.get().objectName().contains("metadata.gluten")) { + val objectName = obj.get().objectName() + if (objectName.contains("metadata.gluten")) { metadataGlutenExist = true - } else if (obj.get().objectName().contains("meta.bin")) { + } else if (objectName.contains("meta.bin")) { metadataBinExist = true - } else if (obj.get().objectName().contains("data.bin")) { + } else if (objectName.contains("data.bin")) { dataBinExist = true + } else if (objectName.contains("_commits")) { + // Spark 35 has _commits directory + // table/_delta_log/_commits/ + hasCommits = true } }) - assertResult(5)(objectCount) + + if (isSparkVersionGE("3.5")) { + assertResult(6)(objectCount) + assert(hasCommits) + } else { + assertResult(5)(objectCount) + } + assert(metadataGlutenExist) assert(metadataBinExist) assert(dataBinExist) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala index 4972861152fd..f914eaa1860a 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala @@ -178,11 +178,13 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu super.beforeAll() } - protected val rootPath: String = this.getClass.getResource("/").getPath - protected val basePath: String = rootPath + "tests-working-home" - protected val warehouse: String = basePath + "/spark-warehouse" - protected val metaStorePathAbsolute: String = basePath + "/meta" - protected val hiveMetaStoreDB: String = metaStorePathAbsolute + "/metastore_db" + final protected val rootPath: String = this.getClass.getResource("/").getPath + final protected val basePath: String = rootPath + "tests-working-home" + final protected val warehouse: String = basePath + "/spark-warehouse" + final protected val metaStorePathAbsolute: String = basePath + "/meta" + + protected val hiveMetaStoreDB: String = + s"$metaStorePathAbsolute/${getClass.getSimpleName}/metastore_db" final override protected val resourcePath: String = "" // ch not need this override protected val fileFormat: String = "parquet" diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala index 28ff5874fabd..383681733026 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala @@ -16,7 +16,8 @@ */ package org.apache.gluten.execution -import org.apache.gluten.execution.AllDataTypesWithComplexType.genTestData +import org.apache.gluten.test.AllDataTypesWithComplexType +import org.apache.gluten.test.AllDataTypesWithComplexType.genTestData import org.apache.spark.SparkConf class GlutenClickhouseCountDistinctSuite extends GlutenClickHouseWholeStageTransformerSuite { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseFunctionSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseFunctionSuite.scala index 1d4d1b6f8afb..ac18f256e807 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseFunctionSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseFunctionSuite.scala @@ -20,12 +20,6 @@ import org.apache.gluten.GlutenConfig import org.apache.gluten.utils.UTSystemParameters import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.delta.DeltaLog - -import org.apache.commons.io.FileUtils - -import java.io.File class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite { override protected val needCopyParquetToTablePath = true @@ -39,9 +33,6 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite { createNotNullTPCHTablesInParquet(tablesPath) } - private var _hiveSpark: SparkSession = _ - override protected def spark: SparkSession = _hiveSpark - override protected def sparkConf: SparkConf = { new SparkConf() .set("spark.plugins", "org.apache.gluten.GlutenPlugin") @@ -69,70 +60,21 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite { .setMaster("local[1]") } - override protected def initializeSession(): Unit = { - if (_hiveSpark == null) { - val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db" - _hiveSpark = SparkSession - .builder() - .config(sparkConf) - .enableHiveSupport() - .config( - "javax.jdo.option.ConnectionURL", - s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true") - .getOrCreate() - } - } - - override def beforeAll(): Unit = { - // prepare working paths - val basePathDir = new File(basePath) - if (basePathDir.exists()) { - FileUtils.forceDelete(basePathDir) - } - FileUtils.forceMkdir(basePathDir) - FileUtils.forceMkdir(new File(warehouse)) - FileUtils.forceMkdir(new File(metaStorePathAbsolute)) - FileUtils.copyDirectory(new File(rootPath + resourcePath), new File(tablesPath)) - super.beforeAll() - } - - override protected def afterAll(): Unit = { - DeltaLog.clearCache() - - try { - super.afterAll() - } finally { - try { - if (_hiveSpark != null) { - try { - _hiveSpark.sessionState.catalog.reset() - } finally { - _hiveSpark.stop() - _hiveSpark = null - } - } - } finally { - SparkSession.clearActiveSession() - SparkSession.clearDefaultSession() - } - } - } - test("test uuid - write and read") { withSQLConf( ("spark.gluten.sql.native.writer.enabled", "true"), (GlutenConfig.GLUTEN_ENABLED.key, "true")) { + withTable("uuid_test") { + spark.sql("create table if not exists uuid_test (id string) using parquet") - spark.sql("drop table if exists uuid_test") - spark.sql("create table if not exists uuid_test (id string) stored as parquet") - - val df = spark.sql("select regexp_replace(uuid(), '-', '') as id from range(1)") - df.cache() - df.write.insertInto("uuid_test") + val df = spark.sql("select regexp_replace(uuid(), '-', '') as id from range(1)") + df.cache() + df.write.insertInto("uuid_test") - val df2 = spark.table("uuid_test") - val diffCount = df.exceptAll(df2).count() - assert(diffCount == 0) + val df2 = spark.table("uuid_test") + val diffCount = df.exceptAll(df2).count() + assert(diffCount == 0) + } } } @@ -181,49 +123,51 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite { } test("GLUTEN-5981 null value from get_json_object") { - spark.sql("create table json_t1 (a string) using parquet") - spark.sql("insert into json_t1 values ('{\"a\":null}')") - runQueryAndCompare( - """ - |SELECT get_json_object(a, '$.a') is null from json_t1 - |""".stripMargin - )(df => checkFallbackOperators(df, 0)) - spark.sql("drop table json_t1") + withTable("json_t1") { + spark.sql("create table json_t1 (a string) using parquet") + spark.sql("insert into json_t1 values ('{\"a\":null}')") + runQueryAndCompare( + """ + |SELECT get_json_object(a, '$.a') is null from json_t1 + |""".stripMargin + )(df => checkFallbackOperators(df, 0)) + } } test("Fix arrayDistinct(Array(Nullable(Decimal))) core dump") { - val create_sql = - """ - |create table if not exists test( - | dec array - |) using parquet - |""".stripMargin - val fill_sql = - """ - |insert into test values(array(1, 2, null)), (array(null, 2,3, 5)) - |""".stripMargin - val query_sql = - """ - |select array_distinct(dec) from test; - |""".stripMargin - spark.sql(create_sql) - spark.sql(fill_sql) - compareResultsAgainstVanillaSpark(query_sql, true, { _ => }) - spark.sql("drop table test") + withTable("json_t1") { + val create_sql = + """ + |create table if not exists test( + | dec array + |) using parquet + |""".stripMargin + val fill_sql = + """ + |insert into test values(array(1, 2, null)), (array(null, 2,3, 5)) + |""".stripMargin + val query_sql = + """ + |select array_distinct(dec) from test; + |""".stripMargin + spark.sql(create_sql) + spark.sql(fill_sql) + compareResultsAgainstVanillaSpark(query_sql, true, { _ => }) + } } test("intersect all") { - spark.sql("create table t1 (a int, b string) using parquet") - spark.sql("insert into t1 values (1, '1'),(2, '2'),(3, '3'),(4, '4'),(5, '5'),(6, '6')") - spark.sql("create table t2 (a int, b string) using parquet") - spark.sql("insert into t2 values (4, '4'),(5, '5'),(6, '6'),(7, '7'),(8, '8'),(9, '9')") - runQueryAndCompare( - """ - |SELECT a,b FROM t1 INTERSECT ALL SELECT a,b FROM t2 - |""".stripMargin - )(df => checkFallbackOperators(df, 0)) - spark.sql("drop table t1") - spark.sql("drop table t2") + withTable("t1", "t2") { + spark.sql("create table t1 (a int, b string) using parquet") + spark.sql("insert into t1 values (1, '1'),(2, '2'),(3, '3'),(4, '4'),(5, '5'),(6, '6')") + spark.sql("create table t2 (a int, b string) using parquet") + spark.sql("insert into t2 values (4, '4'),(5, '5'),(6, '6'),(7, '7'),(8, '8'),(9, '9')") + runQueryAndCompare( + """ + |SELECT a,b FROM t1 INTERSECT ALL SELECT a,b FROM t2 + |""".stripMargin + )(df => checkFallbackOperators(df, 0)) + } } test("array decimal32 CH column to row") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala similarity index 94% rename from backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala rename to backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala index 83bc4e76b1bd..cc9155613343 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala @@ -14,13 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.execution +package org.apache.gluten.execution.hive import org.apache.gluten.GlutenConfig +import org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite, ProjectExecTransformer, TransformSupport} +import org.apache.gluten.test.AllDataTypesWithComplexType import org.apache.gluten.utils.UTSystemParameters import org.apache.spark.SparkConf -import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.{DataFrame, SaveMode} import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.hive.HiveTableScanExecTransformer @@ -29,64 +31,14 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.hadoop.fs.Path import java.io.{File, PrintWriter} -import java.sql.{Date, Timestamp} import scala.reflect.ClassTag -case class AllDataTypesWithComplexType( - string_field: String = null, - int_field: java.lang.Integer = null, - long_field: java.lang.Long = null, - float_field: java.lang.Float = null, - double_field: java.lang.Double = null, - short_field: java.lang.Short = null, - byte_field: java.lang.Byte = null, - boolean_field: java.lang.Boolean = null, - decimal_field: java.math.BigDecimal = null, - date_field: java.sql.Date = null, - timestamp_field: java.sql.Timestamp = null, - array: Seq[Int] = null, - arrayContainsNull: Seq[Option[Int]] = null, - map: Map[Int, Long] = null, - mapValueContainsNull: Map[Int, Option[Long]] = null -) - -object AllDataTypesWithComplexType { - def genTestData(): Seq[AllDataTypesWithComplexType] = { - (0 to 199).map { - i => - if (i % 100 == 1) { - AllDataTypesWithComplexType() - } else { - AllDataTypesWithComplexType( - s"$i", - i, - i.toLong, - i.toFloat, - i.toDouble, - i.toShort, - i.toByte, - i % 2 == 0, - new java.math.BigDecimal(i + ".56"), - Date.valueOf(new Date(System.currentTimeMillis()).toLocalDate.plusDays(i % 10)), - Timestamp.valueOf( - new Timestamp(System.currentTimeMillis()).toLocalDateTime.plusDays(i % 10)), - Seq.apply(i + 1, i + 2, i + 3), - Seq.apply(Option.apply(i + 1), Option.empty, Option.apply(i + 3)), - Map.apply((i + 1, i + 2), (i + 3, i + 4)), - Map.empty - ) - } - } - } -} - class GlutenClickHouseHiveTableSuite extends GlutenClickHouseWholeStageTransformerSuite + with ReCreateHiveSession with AdaptiveSparkPlanHelper { - private var _hiveSpark: SparkSession = _ - override protected def sparkConf: SparkConf = { new SparkConf() .set("spark.plugins", "org.apache.gluten.GlutenPlugin") @@ -119,22 +71,6 @@ class GlutenClickHouseHiveTableSuite .setMaster("local[*]") } - override protected def spark: SparkSession = _hiveSpark - - override protected def initializeSession(): Unit = { - if (_hiveSpark == null) { - val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db" - _hiveSpark = SparkSession - .builder() - .config(sparkConf) - .enableHiveSupport() - .config( - "javax.jdo.option.ConnectionURL", - s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true") - .getOrCreate() - } - } - private val txt_table_name = "hive_txt_test" private val txt_user_define_input = "hive_txt_user_define_input" private val json_table_name = "hive_json_test" @@ -235,24 +171,7 @@ class GlutenClickHouseHiveTableSuite override protected def afterAll(): Unit = { DeltaLog.clearCache() - - try { - super.afterAll() - } finally { - try { - if (_hiveSpark != null) { - try { - _hiveSpark.sessionState.catalog.reset() - } finally { - _hiveSpark.stop() - _hiveSpark = null - } - } - } finally { - SparkSession.clearActiveSession() - SparkSession.clearDefaultSession() - } - } + super.afterAll() } test("test hive text table") { @@ -957,7 +876,7 @@ class GlutenClickHouseHiveTableSuite val select_sql_4 = "select id, get_json_object(data, '$.v111') from test_tbl_3337" val select_sql_5 = "select id, get_json_object(data, 'v112') from test_tbl_3337" val select_sql_6 = - "select id, get_json_object(data, '$.id') from test_tbl_3337 where id = 123"; + "select id, get_json_object(data, '$.id') from test_tbl_3337 where id = 123" compareResultsAgainstVanillaSpark(select_sql_1, compareResult = true, _ => {}) compareResultsAgainstVanillaSpark(select_sql_2, compareResult = true, _ => {}) compareResultsAgainstVanillaSpark(select_sql_3, compareResult = true, _ => {}) @@ -1311,7 +1230,7 @@ class GlutenClickHouseHiveTableSuite .format(dataPath) val select_sql = "select * from test_tbl_6506" spark.sql(create_table_sql) - compareResultsAgainstVanillaSpark(select_sql, true, _ => {}) + compareResultsAgainstVanillaSpark(select_sql, compareResult = true, _ => {}) spark.sql("drop table test_tbl_6506") } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala similarity index 96% rename from backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala rename to backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala index 652b15fc2da0..9e3fa00787de 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala @@ -14,33 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.execution +package org.apache.gluten.execution.hive import org.apache.gluten.GlutenConfig -import org.apache.gluten.execution.AllDataTypesWithComplexType.genTestData +import org.apache.gluten.execution.GlutenClickHouseWholeStageTransformerSuite +import org.apache.gluten.test.AllDataTypesWithComplexType.genTestData import org.apache.gluten.utils.UTSystemParameters import org.apache.spark.SparkConf import org.apache.spark.gluten.NativeWriteChecker -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{DecimalType, LongType, StringType, StructField, StructType} - -import org.scalatest.BeforeAndAfterAll +import org.apache.spark.sql.types._ import scala.reflect.runtime.universe.TypeTag class GlutenClickHouseNativeWriteTableSuite extends GlutenClickHouseWholeStageTransformerSuite with AdaptiveSparkPlanHelper - with SharedSparkSession - with BeforeAndAfterAll + with ReCreateHiveSession with NativeWriteChecker { - private var _hiveSpark: SparkSession = _ - override protected def sparkConf: SparkConf = { var sessionTimeZone = "GMT" if (isSparkVersionGE("3.5")) { @@ -80,45 +74,12 @@ class GlutenClickHouseNativeWriteTableSuite basePath + "/中文/spark-warehouse" } - override protected def spark: SparkSession = _hiveSpark - - override protected def initializeSession(): Unit = { - if (_hiveSpark == null) { - val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db" - _hiveSpark = SparkSession - .builder() - .config(sparkConf) - .enableHiveSupport() - .config( - "javax.jdo.option.ConnectionURL", - s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true") - .getOrCreate() - } - } - private val table_name_template = "hive_%s_test" private val table_name_vanilla_template = "hive_%s_test_written_by_vanilla" override protected def afterAll(): Unit = { DeltaLog.clearCache() - - try { - super.afterAll() - } finally { - try { - if (_hiveSpark != null) { - try { - _hiveSpark.sessionState.catalog.reset() - } finally { - _hiveSpark.stop() - _hiveSpark = null - } - } - } finally { - SparkSession.clearActiveSession() - SparkSession.clearDefaultSession() - } - } + super.afterAll() } def getColumnName(s: String): String = { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala similarity index 87% rename from backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala rename to backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala index f9e831cb4aa7..d359428d03ca 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala @@ -14,12 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.execution +package org.apache.gluten.execution.hive + +import org.apache.gluten.execution.GlutenClickHouseTPCHAbstractSuite import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession.{getActiveSession, getDefaultSession} -import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog} +import org.apache.spark.sql.delta.ClickhouseSnapshot import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -33,7 +35,8 @@ import java.io.File // This suite is to make sure clickhouse commands works well even after spark restart class GlutenClickHouseTableAfterRestart extends GlutenClickHouseTPCHAbstractSuite - with AdaptiveSparkPlanHelper { + with AdaptiveSparkPlanHelper + with ReCreateHiveSession { override protected val needCopyParquetToTablePath = true @@ -64,56 +67,18 @@ class GlutenClickHouseTableAfterRestart .set( "spark.gluten.sql.columnar.backend.ch.runtime_settings.input_format_parquet_max_block_size", "8192") + .setMaster("local[2]") } override protected def createTPCHNotNullTables(): Unit = { createNotNullTPCHTablesInParquet(tablesPath) } - private var _hiveSpark: SparkSession = _ - override protected def spark: SparkSession = _hiveSpark - - override protected def initializeSession(): Unit = { - if (_hiveSpark == null) { - val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db_" + current_db_num - current_db_num += 1 - - _hiveSpark = SparkSession - .builder() - .config(sparkConf) - .enableHiveSupport() - .config( - "javax.jdo.option.ConnectionURL", - s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true") - .master("local[2]") - .getOrCreate() - } - } - - override protected def afterAll(): Unit = { - DeltaLog.clearCache() - - try { - super.afterAll() - } finally { - try { - if (_hiveSpark != null) { - try { - _hiveSpark.sessionState.catalog.reset() - } finally { - _hiveSpark.stop() - _hiveSpark = null - } - } - } finally { - SparkSession.clearActiveSession() - SparkSession.clearDefaultSession() - } - } - } - var current_db_num: Int = 0 + override protected val hiveMetaStoreDB: String = + metaStorePathAbsolute + "/metastore_db_" + current_db_num + test("test mergetree after restart") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree; @@ -347,22 +312,22 @@ class GlutenClickHouseTableAfterRestart SparkSession.clearDefaultSession() } - val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db_" + val metaStoreDB = metaStorePathAbsolute + "/metastore_db_" // use metastore_db2 to avoid issue: "Another instance of Derby may have already booted the database" - val destDir = new File(hiveMetaStoreDB + current_db_num) - destDir.mkdirs() - FileUtils.copyDirectory(new File(hiveMetaStoreDB + (current_db_num - 1)), destDir) - _hiveSpark = null - _hiveSpark = SparkSession - .builder() - .config(sparkConf) - .enableHiveSupport() - .config( - "javax.jdo.option.ConnectionURL", - s"jdbc:derby:;databaseName=$hiveMetaStoreDB$current_db_num") - .master("local[2]") - .getOrCreate() current_db_num += 1 + val destDir = new File(metaStoreDB + current_db_num) + destDir.mkdirs() + FileUtils.copyDirectory(new File(metaStoreDB + (current_db_num - 1)), destDir) + updateHiveSession( + SparkSession + .builder() + .config(sparkConf) + .enableHiveSupport() + .config( + "javax.jdo.option.ConnectionURL", + s"jdbc:derby:;databaseName=$metaStoreDB$current_db_num") + .getOrCreate() + ) } } // scalastyle:off line.size.limit diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/ReCreateHiveSession.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/ReCreateHiveSession.scala new file mode 100644 index 000000000000..c251e46364f5 --- /dev/null +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/ReCreateHiveSession.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution.hive + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.test.SharedSparkSession + +import org.scalatest.BeforeAndAfterAll + +trait ReCreateHiveSession extends SharedSparkSession with BeforeAndAfterAll { + + protected val hiveMetaStoreDB: String + + private var _hiveSpark: SparkSession = _ + + override protected def spark: SparkSession = _hiveSpark + + override protected def initializeSession(): Unit = { + if (_hiveSpark == null) { + _hiveSpark = SparkSession + .builder() + .config(sparkConf) + .enableHiveSupport() + .config( + "javax.jdo.option.ConnectionURL", + s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true") + .getOrCreate() + } + } + + override protected def afterAll(): Unit = { + try { + super.afterAll() + } finally { + try { + if (_hiveSpark != null) { + try { + _hiveSpark.sessionState.catalog.reset() + } finally { + _hiveSpark.stop() + _hiveSpark = null + } + } + } finally { + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + } + } + } + + protected def updateHiveSession(newSession: SparkSession): Unit = { + _hiveSpark = null + _hiveSpark = newSession + } +} diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala index 1e6509c00884..0a8d1729c810 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala @@ -35,12 +35,6 @@ class GlutenParquetFilterSuite with GlutenTPCHBase with Logging { - override protected val rootPath = this.getClass.getResource("/").getPath - override protected val basePath = rootPath + "tests-working-home" - override protected val warehouse = basePath + "/spark-warehouse" - override protected val metaStorePathAbsolute = basePath + "/meta" - override protected val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db" - private val tpchQueriesResourceFolder: String = rootPath + "../../../../gluten-core/src/test/resources/tpch-queries" diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/test/AllDataTypesWithComplexType.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/test/AllDataTypesWithComplexType.scala new file mode 100644 index 000000000000..19abcbea433a --- /dev/null +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/test/AllDataTypesWithComplexType.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.test + +import java.sql.{Date, Timestamp} + +case class AllDataTypesWithComplexType( + string_field: String = null, + int_field: java.lang.Integer = null, + long_field: java.lang.Long = null, + float_field: java.lang.Float = null, + double_field: java.lang.Double = null, + short_field: java.lang.Short = null, + byte_field: java.lang.Byte = null, + boolean_field: java.lang.Boolean = null, + decimal_field: java.math.BigDecimal = null, + date_field: java.sql.Date = null, + timestamp_field: java.sql.Timestamp = null, + array: Seq[Int] = null, + arrayContainsNull: Seq[Option[Int]] = null, + map: Map[Int, Long] = null, + mapValueContainsNull: Map[Int, Option[Long]] = null +) + +object AllDataTypesWithComplexType { + def genTestData(): Seq[AllDataTypesWithComplexType] = { + (0 to 199).map { + i => + if (i % 100 == 1) { + AllDataTypesWithComplexType() + } else { + AllDataTypesWithComplexType( + s"$i", + i, + i.toLong, + i.toFloat, + i.toDouble, + i.toShort, + i.toByte, + i % 2 == 0, + new java.math.BigDecimal(i + ".56"), + Date.valueOf(new Date(System.currentTimeMillis()).toLocalDate.plusDays(i % 10)), + Timestamp.valueOf( + new Timestamp(System.currentTimeMillis()).toLocalDateTime.plusDays(i % 10)), + Seq.apply(i + 1, i + 2, i + 3), + Seq.apply(Option.apply(i + 1), Option.empty, Option.apply(i + 3)), + Map.apply((i + 1, i + 2), (i + 3, i + 4)), + Map.empty + ) + } + } + } +} diff --git a/gluten-celeborn/clickhouse/pom.xml b/gluten-celeborn/clickhouse/pom.xml index 284a8f57282a..9e64e77ce6ea 100755 --- a/gluten-celeborn/clickhouse/pom.xml +++ b/gluten-celeborn/clickhouse/pom.xml @@ -148,6 +148,38 @@ ${hadoop.version} test + + org.apache.arrow + arrow-memory-core + ${arrow.version} + provided + + + io.netty + netty-common + + + io.netty + netty-buffer + + + + + org.apache.arrow + arrow-vector + ${arrow.version} + provided + + + io.netty + netty-common + + + io.netty + netty-buffer + + + diff --git a/gluten-core/src/test/scala/org/apache/gluten/test/FallbackUtil.scala b/gluten-core/src/test/scala/org/apache/gluten/test/FallbackUtil.scala index d2626ab275ce..3d26dd16c4eb 100644 --- a/gluten-core/src/test/scala/org/apache/gluten/test/FallbackUtil.scala +++ b/gluten-core/src/test/scala/org/apache/gluten/test/FallbackUtil.scala @@ -20,11 +20,11 @@ import org.apache.gluten.extension.GlutenPlan import org.apache.spark.internal.Logging import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, AQEShuffleReadExec, QueryStageExec} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec /** - * attention: if AQE is enable,This method will only be executed correctly after the execution plan + * attention: if AQE is enabled,This method will only be executed correctly after the execution plan * is fully determined */ @@ -42,10 +42,14 @@ object FallbackUtil extends Logging with AdaptiveSparkPlanHelper { true case WholeStageCodegenExec(_) => true + case ColumnarInputAdapter(_) => + true case InputAdapter(_) => true case AdaptiveSparkPlanExec(_, _, _, _, _) => true + case AQEShuffleReadExec(_, _) => + true case _: LimitExec => true // for ut @@ -57,30 +61,15 @@ object FallbackUtil extends Logging with AdaptiveSparkPlanHelper { true case _: ReusedExchangeExec => true - case p: SparkPlan if p.supportsColumnar => - true case _ => false } } def hasFallback(plan: SparkPlan): Boolean = { - var fallbackOperator: Seq[SparkPlan] = null - if (plan.isInstanceOf[AdaptiveSparkPlanExec]) { - fallbackOperator = collectWithSubqueries(plan) { - case plan if !plan.isInstanceOf[GlutenPlan] && !skip(plan) => - plan - } - } else { - fallbackOperator = plan.collectWithSubqueries { - case plan if !plan.isInstanceOf[GlutenPlan] && !skip(plan) => - plan - } - } - - if (fallbackOperator.nonEmpty) { - fallbackOperator.foreach(operator => log.info(s"gluten fallback operator:{$operator}")) - } + val fallbackOperator = collectWithSubqueries(plan) { case plan => plan }.filterNot( + plan => plan.isInstanceOf[GlutenPlan] || skip(plan)) + fallbackOperator.foreach(operator => log.info(s"gluten fallback operator:{$operator}")) fallbackOperator.nonEmpty } } diff --git a/gluten-ut/pom.xml b/gluten-ut/pom.xml index 90644b832bf8..a016eccaed20 100644 --- a/gluten-ut/pom.xml +++ b/gluten-ut/pom.xml @@ -31,7 +31,7 @@ gluten-ut pom - Gluten Unit Test + Gluten Unit Test Parent