Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6067][CH][MINOR][UT] Followup 6623, fix backends-clickhouse ut issse in CI #6891

Merged
merged 8 commits into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,33 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
var metadataGlutenExist: Boolean = false
var metadataBinExist: Boolean = false
var dataBinExist: Boolean = false
var hasCommits = false
client
.listObjects(args)
.forEach(
obj => {
objectCount += 1
if (obj.get().objectName().contains("metadata.gluten")) {
val objectName = obj.get().objectName()
if (objectName.contains("metadata.gluten")) {
metadataGlutenExist = true
} else if (obj.get().objectName().contains("meta.bin")) {
} else if (objectName.contains("meta.bin")) {
metadataBinExist = true
} else if (obj.get().objectName().contains("data.bin")) {
} else if (objectName.contains("data.bin")) {
dataBinExist = true
} else if (objectName.contains("_commits")) {
// Spark 35 has _commits directory
// table/_delta_log/_commits/
hasCommits = true
}
})
assertResult(5)(objectCount)

if (isSparkVersionGE("3.5")) {
assertResult(6)(objectCount)
assert(hasCommits)
} else {
assertResult(5)(objectCount)
}

assert(metadataGlutenExist)
assert(metadataBinExist)
assert(dataBinExist)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,13 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu
super.beforeAll()
}

protected val rootPath: String = this.getClass.getResource("/").getPath
protected val basePath: String = rootPath + "tests-working-home"
protected val warehouse: String = basePath + "/spark-warehouse"
protected val metaStorePathAbsolute: String = basePath + "/meta"
protected val hiveMetaStoreDB: String = metaStorePathAbsolute + "/metastore_db"
final protected val rootPath: String = this.getClass.getResource("/").getPath
final protected val basePath: String = rootPath + "tests-working-home"
final protected val warehouse: String = basePath + "/spark-warehouse"
final protected val metaStorePathAbsolute: String = basePath + "/meta"

protected val hiveMetaStoreDB: String =
s"$metaStorePathAbsolute/${getClass.getSimpleName}/metastore_db"

final override protected val resourcePath: String = "" // ch not need this
override protected val fileFormat: String = "parquet"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.execution.AllDataTypesWithComplexType.genTestData
import org.apache.gluten.test.AllDataTypesWithComplexType
import org.apache.gluten.test.AllDataTypesWithComplexType.genTestData

import org.apache.spark.SparkConf
class GlutenClickhouseCountDistinctSuite extends GlutenClickHouseWholeStageTransformerSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.utils.UTSystemParameters

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaLog

import org.apache.commons.io.FileUtils

import java.io.File

class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
override protected val needCopyParquetToTablePath = true
Expand All @@ -39,9 +33,6 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
createNotNullTPCHTablesInParquet(tablesPath)
}

private var _hiveSpark: SparkSession = _
override protected def spark: SparkSession = _hiveSpark

override protected def sparkConf: SparkConf = {
new SparkConf()
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
Expand Down Expand Up @@ -69,70 +60,21 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
.setMaster("local[1]")
}

override protected def initializeSession(): Unit = {
if (_hiveSpark == null) {
val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
_hiveSpark = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.config(
"javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true")
.getOrCreate()
}
}

override def beforeAll(): Unit = {
// prepare working paths
val basePathDir = new File(basePath)
if (basePathDir.exists()) {
FileUtils.forceDelete(basePathDir)
}
FileUtils.forceMkdir(basePathDir)
FileUtils.forceMkdir(new File(warehouse))
FileUtils.forceMkdir(new File(metaStorePathAbsolute))
FileUtils.copyDirectory(new File(rootPath + resourcePath), new File(tablesPath))
super.beforeAll()
}

override protected def afterAll(): Unit = {
DeltaLog.clearCache()

try {
super.afterAll()
} finally {
try {
if (_hiveSpark != null) {
try {
_hiveSpark.sessionState.catalog.reset()
} finally {
_hiveSpark.stop()
_hiveSpark = null
}
}
} finally {
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
}
}
}

test("test uuid - write and read") {
withSQLConf(
("spark.gluten.sql.native.writer.enabled", "true"),
(GlutenConfig.GLUTEN_ENABLED.key, "true")) {
withTable("uuid_test") {
spark.sql("create table if not exists uuid_test (id string) using parquet")

spark.sql("drop table if exists uuid_test")
spark.sql("create table if not exists uuid_test (id string) stored as parquet")

val df = spark.sql("select regexp_replace(uuid(), '-', '') as id from range(1)")
df.cache()
df.write.insertInto("uuid_test")
val df = spark.sql("select regexp_replace(uuid(), '-', '') as id from range(1)")
df.cache()
df.write.insertInto("uuid_test")

val df2 = spark.table("uuid_test")
val diffCount = df.exceptAll(df2).count()
assert(diffCount == 0)
val df2 = spark.table("uuid_test")
val diffCount = df.exceptAll(df2).count()
assert(diffCount == 0)
}
}
}

Expand Down Expand Up @@ -181,49 +123,51 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
}

test("GLUTEN-5981 null value from get_json_object") {
spark.sql("create table json_t1 (a string) using parquet")
spark.sql("insert into json_t1 values ('{\"a\":null}')")
runQueryAndCompare(
"""
|SELECT get_json_object(a, '$.a') is null from json_t1
|""".stripMargin
)(df => checkFallbackOperators(df, 0))
spark.sql("drop table json_t1")
withTable("json_t1") {
spark.sql("create table json_t1 (a string) using parquet")
spark.sql("insert into json_t1 values ('{\"a\":null}')")
runQueryAndCompare(
"""
|SELECT get_json_object(a, '$.a') is null from json_t1
|""".stripMargin
)(df => checkFallbackOperators(df, 0))
}
}

test("Fix arrayDistinct(Array(Nullable(Decimal))) core dump") {
val create_sql =
"""
|create table if not exists test(
| dec array<decimal(10, 2)>
|) using parquet
|""".stripMargin
val fill_sql =
"""
|insert into test values(array(1, 2, null)), (array(null, 2,3, 5))
|""".stripMargin
val query_sql =
"""
|select array_distinct(dec) from test;
|""".stripMargin
spark.sql(create_sql)
spark.sql(fill_sql)
compareResultsAgainstVanillaSpark(query_sql, true, { _ => })
spark.sql("drop table test")
withTable("json_t1") {
val create_sql =
"""
|create table if not exists test(
| dec array<decimal(10, 2)>
|) using parquet
|""".stripMargin
val fill_sql =
"""
|insert into test values(array(1, 2, null)), (array(null, 2,3, 5))
|""".stripMargin
val query_sql =
"""
|select array_distinct(dec) from test;
|""".stripMargin
spark.sql(create_sql)
spark.sql(fill_sql)
compareResultsAgainstVanillaSpark(query_sql, true, { _ => })
}
}

test("intersect all") {
spark.sql("create table t1 (a int, b string) using parquet")
spark.sql("insert into t1 values (1, '1'),(2, '2'),(3, '3'),(4, '4'),(5, '5'),(6, '6')")
spark.sql("create table t2 (a int, b string) using parquet")
spark.sql("insert into t2 values (4, '4'),(5, '5'),(6, '6'),(7, '7'),(8, '8'),(9, '9')")
runQueryAndCompare(
"""
|SELECT a,b FROM t1 INTERSECT ALL SELECT a,b FROM t2
|""".stripMargin
)(df => checkFallbackOperators(df, 0))
spark.sql("drop table t1")
spark.sql("drop table t2")
withTable("t1", "t2") {
spark.sql("create table t1 (a int, b string) using parquet")
spark.sql("insert into t1 values (1, '1'),(2, '2'),(3, '3'),(4, '4'),(5, '5'),(6, '6')")
spark.sql("create table t2 (a int, b string) using parquet")
spark.sql("insert into t2 values (4, '4'),(5, '5'),(6, '6'),(7, '7'),(8, '8'),(9, '9')")
runQueryAndCompare(
"""
|SELECT a,b FROM t1 INTERSECT ALL SELECT a,b FROM t2
|""".stripMargin
)(df => checkFallbackOperators(df, 0))
}
}

test("array decimal32 CH column to row") {
Expand Down
Loading
Loading