Skip to content

Commit

Permalink
Merge branch 'port_covariance' of github.com:huaxingao/arrow-datafusi…
Browse files Browse the repository at this point in the history
…on-comet into port_covariance
  • Loading branch information
Huaxin Gao committed Apr 10, 2024
2 parents 1ffff8b + 3c91603 commit e09e6a4
Show file tree
Hide file tree
Showing 58 changed files with 4,173 additions and 2,836 deletions.
1 change: 1 addition & 0 deletions EXPRESSIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The following Spark expressions are currently available:
+ If
+ Cast
+ Coalesce
+ BloomFilterMightContain
+ Boolean functions
+ And
+ Or
Expand Down
51 changes: 0 additions & 51 deletions common/src/main/java/org/apache/comet/CometArrowStreamWriter.java

This file was deleted.

4 changes: 3 additions & 1 deletion common/src/main/java/org/apache/comet/parquet/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ && isUnsignedIntTypeMatched(logicalTypeAnnotation, 64)) {
|| canReadAsBinaryDecimal(descriptor, sparkType)
|| sparkType == DataTypes.BinaryType
// for uuid, since iceberg maps uuid to StringType
|| sparkType == DataTypes.StringType) {
|| sparkType == DataTypes.StringType
&& logicalTypeAnnotation
instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation) {
return;
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected CometDecodedVector(ValueVector vector, Field valueField, boolean useDe
}

@Override
ValueVector getValueVector() {
public ValueVector getValueVector() {
return valueVector;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public ColumnVector getChild(int i) {
}

@Override
ValueVector getValueVector() {
public ValueVector getValueVector() {
return delegate.getValueVector();
}

Expand All @@ -163,7 +163,7 @@ public CometVector slice(int offset, int length) {
}

@Override
DictionaryProvider getDictionaryProvider() {
public DictionaryProvider getDictionaryProvider() {
return delegate.getDictionaryProvider();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public CometDictionaryVector(
}

@Override
DictionaryProvider getDictionaryProvider() {
public DictionaryProvider getDictionaryProvider() {
return this.provider;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public byte[] getBinary(int rowId) {
}

@Override
CDataDictionaryProvider getDictionaryProvider() {
public CDataDictionaryProvider getDictionaryProvider() {
return null;
}

Expand Down
4 changes: 2 additions & 2 deletions common/src/main/java/org/apache/comet/vector/CometVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@ public void close() {
getValueVector().close();
}

DictionaryProvider getDictionaryProvider() {
public DictionaryProvider getDictionaryProvider() {
throw new UnsupportedOperationException("Not implemented");
}

abstract ValueVector getValueVector();
public abstract ValueVector getValueVector();

/**
* Returns a zero-copying new vector that contains the values from [offset, offset + length).
Expand Down
29 changes: 25 additions & 4 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,13 @@ object CometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXEC_BROADCAST_ENABLED: ConfigEntry[Boolean] =
val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
.doc(
"Whether to enable broadcasting for Comet native operators. By default, " +
"this config is false. Note that this feature is not fully supported yet " +
"and only enabled for test purpose.")
"Whether to force enabling broadcasting for Comet native operators. By default, " +
"this config is false. Comet broadcast feature will be enabled automatically by " +
"Comet extension. But for unit tests, we need this feature to force enabling it " +
"for invalid cases. So this config is only used for unit test.")
.booleanConf
.createWithDefault(false)

Expand Down Expand Up @@ -336,6 +337,26 @@ object CometConf {
"enabled when reading from Iceberg tables.")
.booleanConf
.createWithDefault(false)

val COMET_ROW_TO_COLUMNAR_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.rowToColumnar.enabled")
.internal()
.doc("""
|Whether to enable row to columnar conversion in Comet. When this is turned on, Comet will
|convert row-based operators in `spark.comet.rowToColumnar.supportedOperatorList` into
|columnar based before processing.""".stripMargin)
.booleanConf
.createWithDefault(false)

val COMET_ROW_TO_COLUMNAR_SUPPORTED_OPERATOR_LIST: ConfigEntry[Seq[String]] =
conf("spark.comet.rowToColumnar.supportedOperatorList")
.doc(
"A comma-separated list of row-based operators that will be converted to columnar " +
"format when 'spark.comet.rowToColumnar.enabled' is true")
.stringConf
.toSequence
.createWithDefault(Seq("Range,InMemoryTableScan"))

}

object ConfigHelpers {
Expand Down
94 changes: 16 additions & 78 deletions common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,89 +19,23 @@

package org.apache.comet.vector

import java.io.OutputStream
import java.nio.channels.Channels

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider, Data}
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector._
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.dictionary.DictionaryProvider
import org.apache.spark.SparkException
import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometArrowStreamWriter

class NativeUtil {
import Utils._

private val allocator = new RootAllocator(Long.MaxValue)
private val dictionaryProvider: CDataDictionaryProvider = new CDataDictionaryProvider
private val importer = new ArrowImporter(allocator)

/**
* Serializes a list of `ColumnarBatch` into an output stream.
*
* @param batches
* the output batches, each batch is a list of Arrow vectors wrapped in `CometVector`
* @param out
* the output stream
*/
def serializeBatches(batches: Iterator[ColumnarBatch], out: OutputStream): Long = {
var writer: Option[CometArrowStreamWriter] = None
var rowCount = 0

batches.foreach { batch =>
val (fieldVectors, batchProviderOpt) = getBatchFieldVectors(batch)
val root = new VectorSchemaRoot(fieldVectors.asJava)
val provider = batchProviderOpt.getOrElse(dictionaryProvider)

if (writer.isEmpty) {
writer = Some(new CometArrowStreamWriter(root, provider, Channels.newChannel(out)))
writer.get.start()
writer.get.writeBatch()
} else {
writer.get.writeMoreBatch(root)
}

root.clear()
rowCount += batch.numRows()
}

writer.map(_.end())

rowCount
}

def getBatchFieldVectors(
batch: ColumnarBatch): (Seq[FieldVector], Option[DictionaryProvider]) = {
var provider: Option[DictionaryProvider] = None
val fieldVectors = (0 until batch.numCols()).map { index =>
batch.column(index) match {
case a: CometVector =>
val valueVector = a.getValueVector
if (valueVector.getField.getDictionary != null) {
if (provider.isEmpty) {
provider = Some(a.getDictionaryProvider)
} else {
if (provider.get != a.getDictionaryProvider) {
throw new SparkException(
"Comet execution only takes Arrow Arrays with the same dictionary provider")
}
}
}

getFieldVector(valueVector)

case c =>
throw new SparkException(
"Comet execution only takes Arrow Arrays, but got " +
s"${c.getClass}")
}
}
(fieldVectors, provider)
}

/**
* Exports a Comet `ColumnarBatch` into a list of memory addresses that can be consumed by the
* native execution.
Expand Down Expand Up @@ -199,15 +133,19 @@ class NativeUtil {

new ColumnarBatch(arrayVectors.toArray, maxNumRows)
}
}

object NativeUtil {
def rootAsBatch(arrowRoot: VectorSchemaRoot): ColumnarBatch = {
rootAsBatch(arrowRoot, null)
}

private def getFieldVector(valueVector: ValueVector): FieldVector = {
valueVector match {
case v @ (_: BitVector | _: TinyIntVector | _: SmallIntVector | _: IntVector |
_: BigIntVector | _: Float4Vector | _: Float8Vector | _: VarCharVector |
_: DecimalVector | _: DateDayVector | _: TimeStampMicroTZVector | _: VarBinaryVector |
_: FixedSizeBinaryVector | _: TimeStampMicroVector) =>
v.asInstanceOf[FieldVector]
case _ => throw new SparkException(s"Unsupported Arrow Vector: ${valueVector.getClass}")
def rootAsBatch(arrowRoot: VectorSchemaRoot, provider: DictionaryProvider): ColumnarBatch = {
val vectors = (0 until arrowRoot.getFieldVectors.size()).map { i =>
val vector = arrowRoot.getFieldVectors.get(i)
// Native shuffle always uses decimal128.
CometVector.getVector(vector, true, provider)
}
new ColumnarBatch(vectors.toArray, arrowRoot.getRowCount)
}
}
12 changes: 2 additions & 10 deletions common/src/main/scala/org/apache/comet/vector/StreamReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ package org.apache.comet.vector

import java.nio.channels.ReadableByteChannel

import scala.collection.JavaConverters.collectionAsScalaIterableConverter

import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ReadChannel}
import org.apache.arrow.vector.ipc.message.MessageChannelReader
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* A reader that consumes Arrow data from an input channel, and produces Comet batches.
Expand All @@ -47,13 +45,7 @@ case class StreamReader(channel: ReadableByteChannel) extends AutoCloseable {
}

private def rootAsBatch(root: VectorSchemaRoot): ColumnarBatch = {
val columns = root.getFieldVectors.asScala.map { vector =>
// Native shuffle always uses decimal128.
CometVector.getVector(vector, true, arrowReader).asInstanceOf[ColumnVector]
}.toArray
val batch = new ColumnarBatch(columns)
batch.setNumRows(root.getRowCount)
batch
NativeUtil.rootAsBatch(root, arrowReader)
}

override def close(): Unit = {
Expand Down
Loading

0 comments on commit e09e6a4

Please sign in to comment.