From b305a8822c98948b57ba15809d6330159e9de0ff Mon Sep 17 00:00:00 2001 From: arnavb Date: Thu, 5 Dec 2024 13:07:51 +0000 Subject: [PATCH] update --- .../backendsapi/velox/VeloxBackend.scala | 3 + .../execution/RangeExecTransformer.scala | 70 +++++++------- .../SubstraitToVeloxPlanValidator.cc | 3 +- ep/build-velox/src/get_velox.sh | 2 +- .../columnarbatch/IndicatorVectorPool.java | 4 +- .../backendsapi/BackendSettingsApi.scala | 4 + .../execution/RangeExecBaseTransformer.scala | 34 ++++--- .../execution/GlutenSQLRangeExecSuite.scala | 94 +++++++++++++++++++ .../execution/GlutenSQLRangeExecSuite.scala | 94 +++++++++++++++++++ .../execution/GlutenSQLRangeExecSuite.scala | 94 +++++++++++++++++++ .../execution/GlutenSQLRangeExecSuite.scala | 94 +++++++++++++++++++ pom.xml | 2 +- .../sql/shims/spark33/SparkShimProvider.scala | 2 +- 13 files changed, 443 insertions(+), 57 deletions(-) create mode 100644 gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala create mode 100644 gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala create mode 100644 gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala create mode 100644 gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 45ffd7765299..ac574d0c7a7e 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -504,4 +504,7 @@ object VeloxBackendSettings extends BackendSettingsApi { override def supportColumnarArrowUdf(): Boolean = true override def needPreComputeRangeFrameBoundary(): Boolean = true + + override def supportRangeExec(): Boolean = true + } diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/RangeExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/RangeExecTransformer.scala index d7486fbf57bb..6614a3370ee7 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/RangeExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/RangeExecTransformer.scala @@ -28,6 +28,19 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.utils.SparkArrowUtil import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +/** + * RangeExecTransformer is a concrete implementation of RangeExecBaseTransformer that + * executes the Range operation and supports columnar processing. It generates columnar + * batches for the specified range. + * + * @param start Starting value of the range. + * @param end Ending value of the range. + * @param step Step size for the range. + * @param numSlices Number of slices for partitioning the range. + * @param numElements Total number of elements in the range. + * @param outputAttributes Attributes defining the output schema of the operator. + * @param child Child SparkPlan nodes for this operator, if any. + */ case class RangeExecTransformer( start: Long, end: Long, @@ -44,36 +57,37 @@ case class RangeExecTransformer( numElements, outputAttributes, child) { - // scalastyle:off println + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { - println(s"[arnavb] RangeExecBaseTransformer 47") if (start == end || (start < end ^ 0 < step)) { - println(s"[arnavb] RangeExecBaseTransformer 49") sparkContext.emptyRDD[ColumnarBatch] } else { - println(s"[arnavb] RangeExecBaseTransformer 52") - // scalastyle:off println - println(s"[arnavb] RangeExecBaseTransformer 55") sparkContext .parallelize(0 until numSlices, numSlices) .mapPartitionsWithIndex { (partitionIndex, _) => - println(s"[arnavb] RangeExecBaseTransformer 62") - val sessionLocalTimeZone = SQLConf.get.sessionLocalTimeZone val allocator = ArrowBufferAllocators.contextInstance() + val sessionLocalTimeZone = SQLConf.get.sessionLocalTimeZone val arrowSchema = SparkArrowUtil.toArrowSchema(schema, SQLConf.get.sessionLocalTimeZone) - val batchSize = 2 + + val batchSize = 1000 val safePartitionStart = start + step * (partitionIndex * numElements.toLong / numSlices) val safePartitionEnd = start + step * ((partitionIndex + 1) * numElements.toLong / numSlices) + /** + * Generates the columnar batches for the specified range. Each batch contains + * a subset of the range values, managed using Arrow column vectors. + */ val iterator = new Iterator[ColumnarBatch] { var current = safePartitionStart override def hasNext: Boolean = { - if (step > 0) current < safePartitionEnd - else current > safePartitionEnd + if (step > 0) + current < safePartitionEnd + else + current > safePartitionEnd } override def next(): ColumnarBatch = { @@ -85,46 +99,26 @@ case class RangeExecTransformer( val vectors = ArrowWritableColumnVector.allocateColumns(numRows, schema) for (i <- 0 until numRows) { - // val value = current + i * step - val value = 1 + val value = current + i * step vectors(0).putLong(i, value) } vectors.foreach(_.setValueCount(numRows)) - val batch = new ColumnarBatch(vectors.asInstanceOf[Array[ColumnVector]], numRows) current += numRows * step + + val batch = new ColumnarBatch(vectors.asInstanceOf[Array[ColumnVector]], numRows) val offloadedBatch = ColumnarBatches.offload(allocator, batch) - println(s"[arnavb] returning offloaded batch 94") - println(s"[arnavb] going to return the wrapped batch 110") offloadedBatch } - println(s"[arnavb] returning offloaded batch 112") } Iterators .wrap(iterator) - .recyclePayload(_.close()) .recyclePayload( batch => { - println(s"[arnavb] Closing batch with rows1.") batch.close() }) - .recyclePayload( - offloadedBatch => { - println(s"[arnavb] Closing batch with rows2.") - offloadedBatch.close() - }) - .recycleIterator( - () => { - println(s"[arnavb] Closing allocator for partition1.") - allocator.close() - try { - println(s"[arnavb] Closing iterator2.") - println(s"[arnavb] Iterator successfully closed3.") - } catch { - case e: Exception => - println(s"[arnavb] Error while closing iterator3: ${e.getMessage}") - throw e - } - }) + .recycleIterator { + allocator.close() + } .create() } @@ -132,6 +126,6 @@ case class RangeExecTransformer( } override protected def doExecute(): RDD[org.apache.spark.sql.catalyst.InternalRow] = { - throw new UnsupportedOperationException("Custom execution is not implemented yet.") + throw new UnsupportedOperationException("doExecute is not supported for this operator") } } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc index 0ea614819cf8..3b74caf8ba5a 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc @@ -26,7 +26,6 @@ #include "velox/exec/Aggregate.h" #include "velox/expression/Expr.h" #include "velox/expression/SignatureBinder.h" -#include namespace gluten { @@ -62,6 +61,7 @@ static const std::unordered_set kRegexFunctions = { static const std::unordered_set kBlackList = { "split_part", + "factorial", "concat_ws", "from_json", "json_array_length", @@ -206,7 +206,6 @@ bool SubstraitToVeloxPlanValidator::validateScalarFunction( } if (kBlackList.find(name) != kBlackList.end()) { - std::cout<<"Going to check the blacklist for the following function: "<< name<< std::endl; LOG_VALIDATION_MSG("Function is not supported: " + name); return false; } diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index 1d3a84d858a8..422d96bb0d83 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -101,7 +101,7 @@ function process_setup_centos9 { ensure_pattern_matched 'dnf_install' scripts/setup-centos9.sh sed -i 's/dnf_install ninja-build cmake curl ccache gcc-toolset-12 git/dnf_install ninja-build cmake curl ccache gcc-toolset-12/' scripts/setup-centos9.sh sed -i '/^.*dnf_install autoconf/a\ dnf_install libxml2-devel libgsasl-devel libuuid-devel' scripts/setup-centos9.sh - + ensure_pattern_matched 'install_gflags' scripts/setup-centos9.sh sed -i '/^function install_gflags.*/i function install_openssl {\n wget_and_untar https://github.com/openssl/openssl/releases/download/openssl-3.2.2/openssl-3.2.2.tar.gz openssl \n ( cd ${DEPENDENCY_DIR}/openssl \n ./config no-shared && make depend && make && sudo make install ) \n}\n' scripts/setup-centos9.sh diff --git a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java index df334cea98ce..41c8cbdd4337 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java @@ -27,8 +27,7 @@ public class IndicatorVectorPool implements TaskResource { private static final Logger LOG = LoggerFactory.getLogger(IndicatorVectorPool.class); // A pool for all alive indicator vectors. The reason we adopt the pool // is, we don't want one native columnar batch (which is located via the - // long int handle through JNI bridge) to be owned by more than one - // IndicatorVector + // long int handle through JNI bridge) to be owned by more than one IndicatorVector // instance so release method of the native columnar batch could be guaranteed // to be called and only called once. private final Map uniqueInstances = new ConcurrentHashMap<>(); @@ -37,7 +36,6 @@ public class IndicatorVectorPool implements TaskResource { @Override public void release() throws Exception { - if (!uniqueInstances.isEmpty()) { LOG.warn( "There are still unreleased native columnar batches during ending the task." diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index 1eb69da6e5f0..4eb9e62b7f61 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -128,4 +128,8 @@ trait BackendSettingsApi { def supportColumnarArrowUdf(): Boolean = false def needPreComputeRangeFrameBoundary(): Boolean = false + + def supportRangeExec(): Boolean = { + false + } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/RangeExecBaseTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/RangeExecBaseTransformer.scala index 02e60cef5aa5..57db9ff99408 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/RangeExecBaseTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/RangeExecBaseTransformer.scala @@ -22,7 +22,12 @@ import org.apache.gluten.extension.columnar.transition.Convention import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.{LeafExecNode, RangeExec, SparkPlan} -// scalastyle:off println + + +/** + * Base class for RangeExec transformation, can be implemented by + * the by supported backends. Currently velox is supported. +**/ abstract class RangeExecBaseTransformer( start: Long, end: Long, @@ -34,32 +39,39 @@ abstract class RangeExecBaseTransformer( extends LeafExecNode with ValidatablePlan { - println(s"[arnavb] RangeExecBaseTransformer invoked 123") - - override def output: Seq[Attribute] = outputAttributes + override def output: Seq[Attribute] = { + outputAttributes + } override protected def doValidateInternal(): ValidationResult = { - // scalastyle:off println - println("[arnavb] doValidateInternal called.") + val isSupported = BackendsApiManager.getSettings.supportRangeExec() + + if (!isSupported) { + return ValidationResult.failed( + s"RangeExec is not supported by the current backend." + ) + } ValidationResult.succeeded } - override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType + override def batchType(): Convention.BatchType = { + BackendsApiManager.getSettings.primaryBatchType + } override def rowType0(): Convention.RowType = Convention.RowType.None override protected def doExecute() : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = { - // scalastyle:off println - println("doexecute called.") throw new UnsupportedOperationException(s"This operator doesn't support doExecute().") } } +/** + * Companion object for RangeExecBaseTransformer, provides factory methods + * to create instance from existing RangeExec plan. + */ object RangeExecBaseTransformer { - println(s"[arnavb] RangeExecBaseTransformer invoked") def from(rangeExec: RangeExec): RangeExecBaseTransformer = { - println(s"[arnavb] RangeExecBaseTransformer 1invoked") BackendsApiManager.getSparkPlanExecApiInstance .genRangeExecTransformer( rangeExec.start, diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala new file mode 100644 index 000000000000..025fcb79574b --- /dev/null +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.gluten.execution.RangeExecTransformer + +import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.Row + +class GlutenSQLRangeExecSuite extends GlutenSQLTestsTrait { + + testGluten("RangeExecTransformer produces correct results") { + val df = spark.range(0, 10, 1) + val expectedData = (0L until 10L).map(Row(_)).toSeq + checkAnswer(df, expectedData) + + assert( + getExecutedPlan(df).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } + + testGluten("RangeExecTransformer with step") { + val df = spark.range(5, 15, 2) + val expectedData = Seq(523L, 7L, 9L, 11L, 13L).map(Row(_)) + checkAnswer(df, expectedData) + + assert( + getExecutedPlan(df).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } + + testGluten("RangeExecTransformer with filter") { + val df = spark.range(0, 20, 1).filter("id % 3 == 0") + val expectedData = Seq(0L, 3L, 6L, 9L, 12L, 15L, 18L).map(Row(_)) + checkAnswer(df, expectedData) + + assert( + getExecutedPlan(df).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } + + testGluten("RangeExecTransformer with aggregation") { + val df = spark.range(1, 6, 1) + val sumDf = df.agg(sum("id")) + val expectedData = Seq(Row(15L)) + checkAnswer(sumDf, expectedData) + + assert( + getExecutedPlan(sumDf).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } + + testGluten("RangeExecTransformer with join") { + val df1 = spark.range(0, 5, 1).toDF("id1") + val df2 = spark.range(3, 8, 1).toDF("id2") + val joinDf = df1.join(df2, df1("id1") === df2("id2")) + val expectedData = Seq(Row(3L, 3L), Row(4L, 4L)) + checkAnswer(joinDf, expectedData) + + assert( + getExecutedPlan(joinDf).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } +} diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala new file mode 100644 index 000000000000..0de0388953c4 --- /dev/null +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.gluten.execution.RangeExecTransformer + +import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.Row + +class GlutenSQLRangeExecSuite extends GlutenSQLTestsTrait { + + testGluten("RangeExecTransformer produces correct results") { + val df = spark.range(0, 10, 1) + val expectedData = (0L until 10L).map(Row(_)).toSeq + checkAnswer(df, expectedData) + + assert( + getExecutedPlan(df).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } + + testGluten("RangeExecTransformer with step") { + val df = spark.range(5, 15, 2) + val expectedData = Seq(5L, 7L, 9L, 11L, 13L).map(Row(_)) + checkAnswer(df, expectedData) + + assert( + getExecutedPlan(df).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } + + testGluten("RangeExecTransformer with filter") { + val df = spark.range(0, 20, 1).filter("id % 3 == 0") + val expectedData = Seq(0L, 3L, 6L, 9L, 12L, 15L, 18L).map(Row(_)) + checkAnswer(df, expectedData) + + assert( + getExecutedPlan(df).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } + + testGluten("RangeExecTransformer with aggregation") { + val df = spark.range(1, 6, 1) + val sumDf = df.agg(sum("id")) + val expectedData = Seq(Row(15L)) + checkAnswer(sumDf, expectedData) + + assert( + getExecutedPlan(sumDf).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } + + testGluten("RangeExecTransformer with join") { + val df1 = spark.range(0, 5, 1).toDF("id1") + val df2 = spark.range(3, 8, 1).toDF("id2") + val joinDf = df1.join(df2, df1("id1") === df2("id2")) + val expectedData = Seq(Row(3L, 3L), Row(4L, 4L)) + checkAnswer(joinDf, expectedData) + + assert( + getExecutedPlan(joinDf).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } +} diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala new file mode 100644 index 000000000000..0de0388953c4 --- /dev/null +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.gluten.execution.RangeExecTransformer + +import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.Row + +class GlutenSQLRangeExecSuite extends GlutenSQLTestsTrait { + + testGluten("RangeExecTransformer produces correct results") { + val df = spark.range(0, 10, 1) + val expectedData = (0L until 10L).map(Row(_)).toSeq + checkAnswer(df, expectedData) + + assert( + getExecutedPlan(df).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } + + testGluten("RangeExecTransformer with step") { + val df = spark.range(5, 15, 2) + val expectedData = Seq(5L, 7L, 9L, 11L, 13L).map(Row(_)) + checkAnswer(df, expectedData) + + assert( + getExecutedPlan(df).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } + + testGluten("RangeExecTransformer with filter") { + val df = spark.range(0, 20, 1).filter("id % 3 == 0") + val expectedData = Seq(0L, 3L, 6L, 9L, 12L, 15L, 18L).map(Row(_)) + checkAnswer(df, expectedData) + + assert( + getExecutedPlan(df).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } + + testGluten("RangeExecTransformer with aggregation") { + val df = spark.range(1, 6, 1) + val sumDf = df.agg(sum("id")) + val expectedData = Seq(Row(15L)) + checkAnswer(sumDf, expectedData) + + assert( + getExecutedPlan(sumDf).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } + + testGluten("RangeExecTransformer with join") { + val df1 = spark.range(0, 5, 1).toDF("id1") + val df2 = spark.range(3, 8, 1).toDF("id2") + val joinDf = df1.join(df2, df1("id1") === df2("id2")) + val expectedData = Seq(Row(3L, 3L), Row(4L, 4L)) + checkAnswer(joinDf, expectedData) + + assert( + getExecutedPlan(joinDf).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } +} diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala new file mode 100644 index 000000000000..0de0388953c4 --- /dev/null +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.gluten.execution.RangeExecTransformer + +import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.Row + +class GlutenSQLRangeExecSuite extends GlutenSQLTestsTrait { + + testGluten("RangeExecTransformer produces correct results") { + val df = spark.range(0, 10, 1) + val expectedData = (0L until 10L).map(Row(_)).toSeq + checkAnswer(df, expectedData) + + assert( + getExecutedPlan(df).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } + + testGluten("RangeExecTransformer with step") { + val df = spark.range(5, 15, 2) + val expectedData = Seq(5L, 7L, 9L, 11L, 13L).map(Row(_)) + checkAnswer(df, expectedData) + + assert( + getExecutedPlan(df).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } + + testGluten("RangeExecTransformer with filter") { + val df = spark.range(0, 20, 1).filter("id % 3 == 0") + val expectedData = Seq(0L, 3L, 6L, 9L, 12L, 15L, 18L).map(Row(_)) + checkAnswer(df, expectedData) + + assert( + getExecutedPlan(df).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } + + testGluten("RangeExecTransformer with aggregation") { + val df = spark.range(1, 6, 1) + val sumDf = df.agg(sum("id")) + val expectedData = Seq(Row(15L)) + checkAnswer(sumDf, expectedData) + + assert( + getExecutedPlan(sumDf).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } + + testGluten("RangeExecTransformer with join") { + val df1 = spark.range(0, 5, 1).toDF("id1") + val df2 = spark.range(3, 8, 1).toDF("id2") + val joinDf = df1.join(df2, df1("id1") === df2("id2")) + val expectedData = Seq(Row(3L, 3L), Row(4L, 4L)) + checkAnswer(joinDf, expectedData) + + assert( + getExecutedPlan(joinDf).exists { + case _: RangeExecTransformer => true + case _ => false + } + ) + } +} diff --git a/pom.xml b/pom.xml index 4df4f29d7431..8a2968b56905 100644 --- a/pom.xml +++ b/pom.xml @@ -307,7 +307,7 @@ 3.3 spark-sql-columnar-shims-spark33 - 3.3.2 + 3.3.1 1.3.1 delta-core diff --git a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/SparkShimProvider.scala b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/SparkShimProvider.scala index a790f5758bc6..4ecf10f940df 100644 --- a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/SparkShimProvider.scala +++ b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/SparkShimProvider.scala @@ -20,7 +20,7 @@ import org.apache.gluten.sql.shims.{SparkShimDescriptor, SparkShims} import org.apache.gluten.sql.shims.spark33.SparkShimProvider.DESCRIPTOR object SparkShimProvider { - val DESCRIPTOR = SparkShimDescriptor(3, 3, 2) + val DESCRIPTOR = SparkShimDescriptor(3, 3, 1) } class SparkShimProvider extends org.apache.gluten.sql.shims.SparkShimProvider {