Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnavBalyan committed Dec 5, 2024
1 parent 56d532c commit b305a88
Show file tree
Hide file tree
Showing 13 changed files with 443 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -504,4 +504,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def supportColumnarArrowUdf(): Boolean = true

override def needPreComputeRangeFrameBoundary(): Boolean = true

override def supportRangeExec(): Boolean = true

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.utils.SparkArrowUtil
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

/**
* RangeExecTransformer is a concrete implementation of RangeExecBaseTransformer that
* executes the Range operation and supports columnar processing. It generates columnar
* batches for the specified range.
*
* @param start Starting value of the range.
* @param end Ending value of the range.
* @param step Step size for the range.
* @param numSlices Number of slices for partitioning the range.
* @param numElements Total number of elements in the range.
* @param outputAttributes Attributes defining the output schema of the operator.
* @param child Child SparkPlan nodes for this operator, if any.
*/
case class RangeExecTransformer(
start: Long,
end: Long,
Expand All @@ -44,36 +57,37 @@ case class RangeExecTransformer(
numElements,
outputAttributes,
child) {
// scalastyle:off println

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
println(s"[arnavb] RangeExecBaseTransformer 47")
if (start == end || (start < end ^ 0 < step)) {
println(s"[arnavb] RangeExecBaseTransformer 49")
sparkContext.emptyRDD[ColumnarBatch]
} else {
println(s"[arnavb] RangeExecBaseTransformer 52")
// scalastyle:off println
println(s"[arnavb] RangeExecBaseTransformer 55")
sparkContext
.parallelize(0 until numSlices, numSlices)
.mapPartitionsWithIndex {
(partitionIndex, _) =>
println(s"[arnavb] RangeExecBaseTransformer 62")
val sessionLocalTimeZone = SQLConf.get.sessionLocalTimeZone
val allocator = ArrowBufferAllocators.contextInstance()
val sessionLocalTimeZone = SQLConf.get.sessionLocalTimeZone
val arrowSchema = SparkArrowUtil.toArrowSchema(schema, SQLConf.get.sessionLocalTimeZone)
val batchSize = 2

val batchSize = 1000
val safePartitionStart =
start + step * (partitionIndex * numElements.toLong / numSlices)
val safePartitionEnd =
start + step * ((partitionIndex + 1) * numElements.toLong / numSlices)

/**
* Generates the columnar batches for the specified range. Each batch contains
* a subset of the range values, managed using Arrow column vectors.
*/
val iterator = new Iterator[ColumnarBatch] {
var current = safePartitionStart

override def hasNext: Boolean = {
if (step > 0) current < safePartitionEnd
else current > safePartitionEnd
if (step > 0)
current < safePartitionEnd
else
current > safePartitionEnd
}

override def next(): ColumnarBatch = {
Expand All @@ -85,53 +99,33 @@ case class RangeExecTransformer(
val vectors = ArrowWritableColumnVector.allocateColumns(numRows, schema)

for (i <- 0 until numRows) {
// val value = current + i * step
val value = 1
val value = current + i * step
vectors(0).putLong(i, value)
}
vectors.foreach(_.setValueCount(numRows))
val batch = new ColumnarBatch(vectors.asInstanceOf[Array[ColumnVector]], numRows)
current += numRows * step

val batch = new ColumnarBatch(vectors.asInstanceOf[Array[ColumnVector]], numRows)
val offloadedBatch = ColumnarBatches.offload(allocator, batch)
println(s"[arnavb] returning offloaded batch 94")
println(s"[arnavb] going to return the wrapped batch 110")
offloadedBatch
}
println(s"[arnavb] returning offloaded batch 112")
}
Iterators
.wrap(iterator)
.recyclePayload(_.close())
.recyclePayload(
batch => {
println(s"[arnavb] Closing batch with rows1.")
batch.close()
})
.recyclePayload(
offloadedBatch => {
println(s"[arnavb] Closing batch with rows2.")
offloadedBatch.close()
})
.recycleIterator(
() => {
println(s"[arnavb] Closing allocator for partition1.")
allocator.close()
try {
println(s"[arnavb] Closing iterator2.")
println(s"[arnavb] Iterator successfully closed3.")
} catch {
case e: Exception =>
println(s"[arnavb] Error while closing iterator3: ${e.getMessage}")
throw e
}
})
.recycleIterator {
allocator.close()
}
.create()

}
}
}

override protected def doExecute(): RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException("Custom execution is not implemented yet.")
throw new UnsupportedOperationException("doExecute is not supported for this operator")
}
}
3 changes: 1 addition & 2 deletions cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "velox/exec/Aggregate.h"
#include "velox/expression/Expr.h"
#include "velox/expression/SignatureBinder.h"
#include <iostream>

namespace gluten {

Expand Down Expand Up @@ -62,6 +61,7 @@ static const std::unordered_set<std::string> kRegexFunctions = {

static const std::unordered_set<std::string> kBlackList = {
"split_part",
"factorial",
"concat_ws",
"from_json",
"json_array_length",
Expand Down Expand Up @@ -206,7 +206,6 @@ bool SubstraitToVeloxPlanValidator::validateScalarFunction(
}

if (kBlackList.find(name) != kBlackList.end()) {
std::cout<<"Going to check the blacklist for the following function: "<< name<< std::endl;
LOG_VALIDATION_MSG("Function is not supported: " + name);
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion ep/build-velox/src/get_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ function process_setup_centos9 {
ensure_pattern_matched 'dnf_install' scripts/setup-centos9.sh
sed -i 's/dnf_install ninja-build cmake curl ccache gcc-toolset-12 git/dnf_install ninja-build cmake curl ccache gcc-toolset-12/' scripts/setup-centos9.sh
sed -i '/^.*dnf_install autoconf/a\ dnf_install libxml2-devel libgsasl-devel libuuid-devel' scripts/setup-centos9.sh

ensure_pattern_matched 'install_gflags' scripts/setup-centos9.sh
sed -i '/^function install_gflags.*/i function install_openssl {\n wget_and_untar https://github.com/openssl/openssl/releases/download/openssl-3.2.2/openssl-3.2.2.tar.gz openssl \n ( cd ${DEPENDENCY_DIR}/openssl \n ./config no-shared && make depend && make && sudo make install ) \n}\n' scripts/setup-centos9.sh

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ public class IndicatorVectorPool implements TaskResource {
private static final Logger LOG = LoggerFactory.getLogger(IndicatorVectorPool.class);
// A pool for all alive indicator vectors. The reason we adopt the pool
// is, we don't want one native columnar batch (which is located via the
// long int handle through JNI bridge) to be owned by more than one
// IndicatorVector
// long int handle through JNI bridge) to be owned by more than one IndicatorVector
// instance so release method of the native columnar batch could be guaranteed
// to be called and only called once.
private final Map<Long, IndicatorVector> uniqueInstances = new ConcurrentHashMap<>();
Expand All @@ -37,7 +36,6 @@ public class IndicatorVectorPool implements TaskResource {

@Override
public void release() throws Exception {

if (!uniqueInstances.isEmpty()) {
LOG.warn(
"There are still unreleased native columnar batches during ending the task."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,8 @@ trait BackendSettingsApi {
def supportColumnarArrowUdf(): Boolean = false

def needPreComputeRangeFrameBoundary(): Boolean = false

def supportRangeExec(): Boolean = {
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ import org.apache.gluten.extension.columnar.transition.Convention

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{LeafExecNode, RangeExec, SparkPlan}
// scalastyle:off println


/**
* Base class for RangeExec transformation, can be implemented by
* the by supported backends. Currently velox is supported.
**/
abstract class RangeExecBaseTransformer(
start: Long,
end: Long,
Expand All @@ -34,32 +39,39 @@ abstract class RangeExecBaseTransformer(
extends LeafExecNode
with ValidatablePlan {

println(s"[arnavb] RangeExecBaseTransformer invoked 123")

override def output: Seq[Attribute] = outputAttributes
override def output: Seq[Attribute] = {
outputAttributes
}

override protected def doValidateInternal(): ValidationResult = {
// scalastyle:off println
println("[arnavb] doValidateInternal called.")
val isSupported = BackendsApiManager.getSettings.supportRangeExec()

if (!isSupported) {
return ValidationResult.failed(
s"RangeExec is not supported by the current backend."
)
}
ValidationResult.succeeded
}

override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType
override def batchType(): Convention.BatchType = {
BackendsApiManager.getSettings.primaryBatchType
}

override def rowType0(): Convention.RowType = Convention.RowType.None

override protected def doExecute()
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
// scalastyle:off println
println("doexecute called.")
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}
}

/**
* Companion object for RangeExecBaseTransformer, provides factory methods
* to create instance from existing RangeExec plan.
*/
object RangeExecBaseTransformer {
println(s"[arnavb] RangeExecBaseTransformer invoked")
def from(rangeExec: RangeExec): RangeExecBaseTransformer = {
println(s"[arnavb] RangeExecBaseTransformer 1invoked")
BackendsApiManager.getSparkPlanExecApiInstance
.genRangeExecTransformer(
rangeExec.start,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.gluten.execution.RangeExecTransformer

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.Row

class GlutenSQLRangeExecSuite extends GlutenSQLTestsTrait {

testGluten("RangeExecTransformer produces correct results") {
val df = spark.range(0, 10, 1)
val expectedData = (0L until 10L).map(Row(_)).toSeq
checkAnswer(df, expectedData)

assert(
getExecutedPlan(df).exists {
case _: RangeExecTransformer => true
case _ => false
}
)
}

testGluten("RangeExecTransformer with step") {
val df = spark.range(5, 15, 2)
val expectedData = Seq(523L, 7L, 9L, 11L, 13L).map(Row(_))
checkAnswer(df, expectedData)

assert(
getExecutedPlan(df).exists {
case _: RangeExecTransformer => true
case _ => false
}
)
}

testGluten("RangeExecTransformer with filter") {
val df = spark.range(0, 20, 1).filter("id % 3 == 0")
val expectedData = Seq(0L, 3L, 6L, 9L, 12L, 15L, 18L).map(Row(_))
checkAnswer(df, expectedData)

assert(
getExecutedPlan(df).exists {
case _: RangeExecTransformer => true
case _ => false
}
)
}

testGluten("RangeExecTransformer with aggregation") {
val df = spark.range(1, 6, 1)
val sumDf = df.agg(sum("id"))
val expectedData = Seq(Row(15L))
checkAnswer(sumDf, expectedData)

assert(
getExecutedPlan(sumDf).exists {
case _: RangeExecTransformer => true
case _ => false
}
)
}

testGluten("RangeExecTransformer with join") {
val df1 = spark.range(0, 5, 1).toDF("id1")
val df2 = spark.range(3, 8, 1).toDF("id2")
val joinDf = df1.join(df2, df1("id1") === df2("id2"))
val expectedData = Seq(Row(3L, 3L), Row(4L, 4L))
checkAnswer(joinDf, expectedData)

assert(
getExecutedPlan(joinDf).exists {
case _: RangeExecTransformer => true
case _ => false
}
)
}
}
Loading

0 comments on commit b305a88

Please sign in to comment.