Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6067][CH] [Part 3-2] Basic support for Native Write in Spark 3.5 #6586

Merged
merged 5 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public class OperatorMetrics implements IOperatorMetrics {
public JoinParams joinParams;
public AggregationParams aggParams;

public long physicalWrittenBytes;
public long numWrittenFiles;

/** Create an instance for operator metrics. */
public OperatorMetrics(
List<MetricsData> metricsList, JoinParams joinParams, AggregationParams aggParams) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
import org.apache.gluten.metrics.IMetrics;
import org.apache.gluten.metrics.NativeMetrics;

import org.apache.spark.sql.execution.utils.CHExecUtil;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class BatchIterator extends GeneralOutIterator {
Expand All @@ -43,8 +40,6 @@ public String getId() {

private native boolean nativeHasNext(long nativeHandle);

private native byte[] nativeNext(long nativeHandle);

private native long nativeCHNext(long nativeHandle);

private native void nativeClose(long nativeHandle);
Expand All @@ -54,22 +49,15 @@ public String getId() {
private native String nativeFetchMetrics(long nativeHandle);

@Override
public boolean hasNextInternal() throws IOException {
public boolean hasNextInternal() {
return nativeHasNext(handle);
}

@Override
public ColumnarBatch nextInternal() throws IOException {
public ColumnarBatch nextInternal() {
long block = nativeCHNext(handle);
CHNativeBlock nativeBlock = new CHNativeBlock(block);
int cols = nativeBlock.numColumns();
ColumnVector[] columnVectors = new ColumnVector[cols];
for (int i = 0; i < cols; i++) {
columnVectors[i] =
new CHColumnVector(
CHExecUtil.inferSparkDataType(nativeBlock.getTypeByPosition(i)), block, i);
}
return new ColumnarBatch(columnVectors, nativeBlock.numRows());
return nativeBlock.toColumnarBatch();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public class CHColumnVector extends ColumnVector {
private final int columnPosition;
private long blockAddress;
private final long blockAddress;

public CHColumnVector(DataType type, long blockAddress, int columnPosition) {
super(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,13 @@ public static void closeFromColumnarBatch(ColumnarBatch cb) {
}

public ColumnarBatch toColumnarBatch() {
ColumnVector[] vectors = new ColumnVector[numColumns()];
for (int i = 0; i < numColumns(); i++) {
int numRows = numRows();
int cols = numColumns();
ColumnVector[] vectors = new ColumnVector[cols];
for (int i = 0; i < cols; i++) {
vectors[i] =
new CHColumnVector(CHExecUtil.inferSparkDataType(getTypeByPosition(i)), blockAddress, i);
}
int numRows = 0;
if (numColumns() != 0) {
numRows = numRows();
}
return new ColumnarBatch(vectors, numRows);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.spark.SparkConf;
import org.apache.spark.sql.internal.SQLConf;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -36,15 +37,12 @@
import scala.Tuple2;
import scala.collection.JavaConverters;

public class CHNativeExpressionEvaluator {
private final ExpressionEvaluatorJniWrapper jniWrapper;
public class CHNativeExpressionEvaluator extends ExpressionEvaluatorJniWrapper {

public CHNativeExpressionEvaluator() {
jniWrapper = new ExpressionEvaluatorJniWrapper();
}
private CHNativeExpressionEvaluator() {}

// Used to initialize the native computing.
public void initNative(SparkConf conf) {
public static void initNative(SparkConf conf) {
Tuple2<String, String>[] all = conf.getAll();
Map<String, String> confMap =
Arrays.stream(all).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
Expand All @@ -55,19 +53,19 @@ public void initNative(SparkConf conf) {
// Get the customer config from SparkConf for each backend
BackendsApiManager.getTransformerApiInstance().postProcessNativeConfig(nativeConfMap, prefix);

jniWrapper.nativeInitNative(buildNativeConf(nativeConfMap));
nativeInitNative(buildNativeConf(nativeConfMap));
}

public void finalizeNative() {
jniWrapper.nativeFinalizeNative();
public static void finalizeNative() {
nativeFinalizeNative();
}

// Used to validate the Substrait plan in native compute engine.
public boolean doValidate(byte[] subPlan) {
return jniWrapper.nativeDoValidate(subPlan);
public static boolean doValidate(byte[] subPlan) {
throw new UnsupportedOperationException("doValidate is not supported in Clickhouse Backend");
}

private byte[] buildNativeConf(Map<String, String> confs) {
private static byte[] buildNativeConf(Map<String, String> confs) {
StringMapNode stringMapNode = ExpressionBuilder.makeStringMap(confs);
AdvancedExtensionNode extensionNode =
ExtensionBuilder.makeAdvancedExtension(
Expand All @@ -76,27 +74,28 @@ private byte[] buildNativeConf(Map<String, String> confs) {
return PlanBuilder.makePlan(extensionNode).toProtobuf().toByteArray();
}

private Map<String, String> getNativeBackendConf() {
private static Map<String, String> getNativeBackendConf() {
return GlutenConfig.getNativeBackendConf(
BackendsApiManager.getSettings().getBackendConfigPrefix(), SQLConf.get().getAllConfs());
}

public static void injectWriteFilesTempPath(String path, String fileName) {
throw new UnsupportedOperationException(
"injectWriteFilesTempPath Not supported in CHNativeExpressionEvaluator");
ExpressionEvaluatorJniWrapper.injectWriteFilesTempPath(
CHNativeMemoryAllocators.contextInstance().getNativeInstanceId(),
path.getBytes(StandardCharsets.UTF_8),
fileName.getBytes(StandardCharsets.UTF_8));
}

// Used by WholeStageTransform to create the native computing pipeline and
// return a columnar result iterator.
public BatchIterator createKernelWithBatchIterator(
public static BatchIterator createKernelWithBatchIterator(
byte[] wsPlan,
byte[][] splitInfo,
List<GeneralInIterator> iterList,
boolean materializeInput) {
long allocId = CHNativeMemoryAllocators.contextInstance().getNativeInstanceId();
long handle =
jniWrapper.nativeCreateKernelWithIterator(
allocId,
nativeCreateKernelWithIterator(
CHNativeMemoryAllocators.contextInstance().getNativeInstanceId(),
wsPlan,
splitInfo,
iterList.toArray(new GeneralInIterator[0]),
Expand All @@ -106,10 +105,10 @@ public BatchIterator createKernelWithBatchIterator(
}

// Only for UT.
public BatchIterator createKernelWithBatchIterator(
public static BatchIterator createKernelWithBatchIterator(
long allocId, byte[] wsPlan, byte[][] splitInfo, List<GeneralInIterator> iterList) {
long handle =
jniWrapper.nativeCreateKernelWithIterator(
nativeCreateKernelWithIterator(
allocId,
wsPlan,
splitInfo,
Expand All @@ -119,7 +118,7 @@ public BatchIterator createKernelWithBatchIterator(
return createBatchIterator(handle);
}

private BatchIterator createBatchIterator(long nativeHandle) {
private static BatchIterator createBatchIterator(long nativeHandle) {
return new BatchIterator(nativeHandle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,18 @@
public class ExpressionEvaluatorJniWrapper {

/** Call initNative to initialize native computing. */
native void nativeInitNative(byte[] confAsPlan);
static native void nativeInitNative(byte[] confAsPlan);

/** Call finalizeNative to finalize native computing. */
native void nativeFinalizeNative();

/**
* Validate the Substrait plan in native compute engine.
*
* @param subPlan the Substrait plan in binary format.
* @return whether the computing of this plan is supported in native.
*/
native boolean nativeDoValidate(byte[] subPlan);
static native void nativeFinalizeNative();

/**
* Create a native compute kernel and return a columnar result iterator.
*
* @param allocatorId allocator id
* @return iterator instance id
*/
public native long nativeCreateKernelWithIterator(
public static native long nativeCreateKernelWithIterator(
long allocatorId,
byte[] wsPlan,
byte[][] splitInfo,
Expand All @@ -52,9 +44,11 @@ public native long nativeCreateKernelWithIterator(
boolean materializeInput);

/**
* Closes the projector referenced by nativeHandler.
* Set the temp path for writing files.
*
* @param nativeHandler nativeHandler that needs to be closed
* @param allocatorId allocator id for current task attempt(or thread)
* @param path the temp path for writing files
*/
native void nativeClose(long nativeHandler);
public static native void injectWriteFilesTempPath(
long allocatorId, byte[] path, byte[] filename);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@ package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.{CH_BRANCH, CH_COMMIT, GlutenConfig}
import org.apache.gluten.backendsapi._
import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, StructField, StructType}

import java.util.Locale

Expand Down Expand Up @@ -187,6 +192,73 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}

override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
options: Map[String, String]): ValidationResult = {

def validateCompressionCodec(): Option[String] = {
// FIXME: verify Support compression codec
val compressionCodec = WriteFilesExecTransformer.getCompressionCodec(options)
None
}

def validateFileFormat(): Option[String] = {
format match {
case _: ParquetFileFormat => None
case _: OrcFileFormat => None
case f: FileFormat => Some(s"Not support FileFormat: ${f.getClass.getSimpleName}")
}
}

// Validate if all types are supported.
def validateDateTypes(): Option[String] = {
None
}

def validateFieldMetadata(): Option[String] = {
// copy CharVarcharUtils.CHAR_VARCHAR_TYPE_STRING_METADATA_KEY
val CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = "__CHAR_VARCHAR_TYPE_STRING"
fields
.find(_.metadata != Metadata.empty)
.filterNot(_.metadata.contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY))
.map {
filed =>
s"StructField contain the metadata information: $filed, metadata: ${filed.metadata}"
}
}
def validateWriteFilesOptions(): Option[String] = {
val maxRecordsPerFile = options
.get("maxRecordsPerFile")
.map(_.toLong)
.getOrElse(SQLConf.get.maxRecordsPerFile)
if (maxRecordsPerFile > 0) {
Some("Unsupported native write: maxRecordsPerFile not supported.")
} else {
None
}
}

def validateBucketSpec(): Option[String] = {
if (bucketSpec.nonEmpty) {
Some("Unsupported native write: bucket write is not supported.")
} else {
None
}
}

validateCompressionCodec()
.orElse(validateFileFormat())
.orElse(validateFieldMetadata())
.orElse(validateDateTypes())
.orElse(validateWriteFilesOptions())
.orElse(validateBucketSpec()) match {
case Some(reason) => ValidationResult.failed(reason)
case _ => ValidationResult.succeeded
}
}

override def supportShuffleWithProject(
outputPartitioning: Partitioning,
child: SparkPlan): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
}
.map(it => new ColumnarNativeIterator(it.asJava).asInstanceOf[GeneralInIterator])
.asJava
new CHNativeExpressionEvaluator().createKernelWithBatchIterator(
CHNativeExpressionEvaluator.createKernelWithBatchIterator(
wsPlan,
splitInfoByteArray,
listIterator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class CHListenerApi extends ListenerApi with Logging {
val externalSortKey = s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings" +
s".max_bytes_before_external_sort"
if (conf.getLong(externalSortKey, -1) < 0) {
if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
if (conf.getBoolean("spark.memory.offHeap.enabled", defaultValue = false)) {
val memSize = JavaUtils.byteStringAsBytes(conf.get("spark.memory.offHeap.size"))
if (memSize > 0L) {
val cores = conf.getInt("spark.executor.cores", 1).toLong
Expand All @@ -97,8 +97,7 @@ class CHListenerApi extends ListenerApi with Logging {
// Load supported hive/python/scala udfs
UDFMappings.loadFromSparkConf(conf)

val initKernel = new CHNativeExpressionEvaluator()
initKernel.initNative(conf)
CHNativeExpressionEvaluator.initNative(conf)

// inject backend-specific implementations to override spark classes
// FIXME: The following set instances twice in local mode?
Expand All @@ -110,7 +109,6 @@ class CHListenerApi extends ListenerApi with Logging {

private def shutdown(): Unit = {
CHBroadcastBuildSideCache.cleanAll()
val kernel = new CHNativeExpressionEvaluator()
kernel.finalizeNative()
CHNativeExpressionEvaluator.finalizeNative()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,13 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
s"SampleTransformer metrics update is not supported in CH backend")
}

def genWriteFilesTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = {
throw new UnsupportedOperationException(
s"WriteFilesTransformer metrics update is not supported in CH backend")
}
def genWriteFilesTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"physicalWrittenBytes" -> SQLMetrics.createMetric(sparkContext, "number of written bytes"),
"numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files")
)

def genWriteFilesTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = {
throw new UnsupportedOperationException(
s"WriteFilesTransformer metrics update is not supported in CH backend")
new WriteFilesMetricsUpdater(metrics)
}
}
Loading
Loading