From 620140825968d491c653da5f9991f9bb2e6354e1 Mon Sep 17 00:00:00 2001 From: Zhichao Zhang Date: Tue, 5 Dec 2023 14:14:46 +0800 Subject: [PATCH] [GLUTEN-3922][CH] Fix incorrect shuffle hash id value when executing modulo Fix incorrect shuffle hash id value when executing modulo. In CH Backend, the data type of the shuffle split num is a UInt32 and the returned type of the hash function is a UInt64, when the returned value of the hash function is more than 2^31 - 1, the modulo value of the hash value and the shuffle split num is different from the one of the vanilla spark. Close #3922. --- ...utenClickHouseTPCHParquetBucketSuite.scala | 70 ++++++++++++++++++- .../local-engine/Shuffle/SelectorBuilder.cpp | 12 +++- ...ClickHouseRSSColumnarShuffleAQESuite.scala | 6 +- 3 files changed, 83 insertions(+), 5 deletions(-) diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetBucketSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetBucketSuite.scala index 2f0011156b900..e840cde6e99b9 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetBucketSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetBucketSuite.scala @@ -87,6 +87,27 @@ class GlutenClickHouseTPCHParquetBucketSuite | CLUSTERED BY (c_custkey) SORTED BY (c_custkey) INTO 2 BUCKETS; |""".stripMargin) + val customerData1 = bucketTableDataPath + "/customer_6_buckets" + spark.sql(s"DROP TABLE IF EXISTS customer_6_buckets") + spark.sql(s""" + | CREATE EXTERNAL TABLE IF NOT EXISTS customer_6_buckets ( + | c_custkey bigint, + | c_name string, + | c_address string, + | c_nationkey bigint, + | c_phone string, + | c_acctbal double, + | c_mktsegment string, + | c_comment string) + | USING PARQUET + | LOCATION '$customerData1' + | CLUSTERED BY (c_custkey) SORTED BY (c_custkey) INTO 6 BUCKETS; + |""".stripMargin) + + spark.sql(s""" + |INSERT INTO customer_6_buckets SELECT * FROM customer; + |""".stripMargin) + val lineitemData = bucketTableDataPath + "/lineitem" spark.sql(s"DROP TABLE IF EXISTS lineitem") spark.sql(s""" @@ -155,6 +176,28 @@ class GlutenClickHouseTPCHParquetBucketSuite | CLUSTERED BY (o_orderkey) SORTED BY (o_orderkey, o_orderdate) INTO 2 BUCKETS; |""".stripMargin) + val ordersData1 = bucketTableDataPath + "/orders_6_buckets" + spark.sql(s"DROP TABLE IF EXISTS orders_6_buckets") + spark.sql(s""" + | CREATE EXTERNAL TABLE IF NOT EXISTS orders_6_buckets ( + | o_orderkey bigint, + | o_custkey bigint, + | o_orderstatus string, + | o_totalprice double, + | o_orderdate date, + | o_orderpriority string, + | o_clerk string, + | o_shippriority bigint, + | o_comment string) + | USING PARQUET + | LOCATION '$ordersData1' + | CLUSTERED BY (o_orderkey) SORTED BY (o_orderkey, o_orderdate) INTO 6 BUCKETS; + |""".stripMargin) + + spark.sql(s""" + |INSERT INTO orders_6_buckets SELECT * FROM orders; + |""".stripMargin) + val partData = bucketTableDataPath + "/part" spark.sql(s"DROP TABLE IF EXISTS part") spark.sql(s""" @@ -208,7 +251,7 @@ class GlutenClickHouseTPCHParquetBucketSuite | show tables; |""".stripMargin) .collect() - assert(result.length == 8) + assert(result.length == 10) } test("TPCH Q1") { @@ -498,5 +541,30 @@ class GlutenClickHouseTPCHParquetBucketSuite } ) } + + test("GLUTEN-3922: Fix incorrect shuffle hash id value when executing modulo") { + val SQL = + """ + |SELECT + | c_custkey, o_custkey, hash(o_custkey), pmod(hash(o_custkey), 12), + | pmod(hash(o_custkey), 4) + |FROM + | customer_6_buckets, + | orders_6_buckets + |WHERE + | c_mktsegment = 'BUILDING' + | AND c_custkey = o_custkey + | AND o_orderdate < date'1995-03-15' + |ORDER BY + | o_custkey DESC, + | c_custkey + |LIMIT 100; + |""".stripMargin + compareResultsAgainstVanillaSpark( + SQL, + true, + df => {} + ) + } } // scalastyle:on line.size.limit diff --git a/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp b/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp index f385d8a8d2366..6d6cae0a8dc3c 100644 --- a/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp +++ b/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp @@ -123,8 +123,18 @@ PartitionInfo HashSelectorBuilder::build(DB::Block & block) } else { + auto parts_num_int32 = static_cast(parts_num); for (size_t i = 0; i < rows; i++) - partition_ids.emplace_back(static_cast(hash_column->get64(i) % parts_num)); + { + // cast to int32 to be the same as the data type of the vanilla spark + auto hash_int32 = static_cast(hash_column->get64(i)); + auto res = hash_int32 % parts_num_int32; + if (res < 0) + { + res += parts_num_int32; + } + partition_ids.emplace_back(static_cast(res)); + } } return PartitionInfo::fromSelector(std::move(partition_ids), parts_num); } diff --git a/gluten-celeborn/clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseRSSColumnarShuffleAQESuite.scala b/gluten-celeborn/clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseRSSColumnarShuffleAQESuite.scala index 5b283299ece55..f581f11280b4b 100644 --- a/gluten-celeborn/clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseRSSColumnarShuffleAQESuite.scala +++ b/gluten-celeborn/clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseRSSColumnarShuffleAQESuite.scala @@ -196,11 +196,11 @@ class GlutenClickHouseRSSColumnarShuffleAQESuite val res = spark.sql("select spark_partition_id(), id from t group by id").collect() assert(res.length == 3) assert(res(0).getInt(0) == 0) - assert(res(0).getInt(1) == 0) + assert(res(0).getInt(1) == 2) assert(res(1).getInt(0) == 1) - assert(res(1).getInt(1) == 1) + assert(res(1).getInt(1) == 0) assert(res(2).getInt(0) == 2) - assert(res(2).getInt(1) == 2) + assert(res(2).getInt(1) == 1) } } }