Skip to content

Commit

Permalink
[GLUTEN-6067][CH] [Part 3-2] Basic support for Native Write in Spark …
Browse files Browse the repository at this point in the history
…3.5 (#6586)

* 1. Reduce duplicated codes: BatchIterator::nextInternal now call CHNativeBlock::toColumnarBatch() to return ColumnarBatch
2. Extract a mew function SerializedPlanParser::buildPipeline, which used in the follow up PRs
3. Refactor File Wrapper, extract create_output_format_file for later use
4. Add GLUTEN_SOURCE_DIR, so that gtest can read java resource
5. Add SubstraitParserUtils.h, so that we can remove parseJson
6. Many litter refactor

* Make ExpressionEvaluatorJniWrapper static

* Refactor  GlutenClickHouseNativeWriteTableSuite and NativeWriteChecker

* Add BlockTypeUtils.h

* Support Native Write
  • Loading branch information
baibaichen authored Jul 26, 2024
1 parent a7fb09e commit d90a7f4
Show file tree
Hide file tree
Showing 61 changed files with 12,925 additions and 1,026 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public class OperatorMetrics implements IOperatorMetrics {
public JoinParams joinParams;
public AggregationParams aggParams;

public long physicalWrittenBytes;
public long numWrittenFiles;

/** Create an instance for operator metrics. */
public OperatorMetrics(
List<MetricsData> metricsList, JoinParams joinParams, AggregationParams aggParams) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
import org.apache.gluten.metrics.IMetrics;
import org.apache.gluten.metrics.NativeMetrics;

import org.apache.spark.sql.execution.utils.CHExecUtil;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class BatchIterator extends GeneralOutIterator {
Expand All @@ -43,8 +40,6 @@ public String getId() {

private native boolean nativeHasNext(long nativeHandle);

private native byte[] nativeNext(long nativeHandle);

private native long nativeCHNext(long nativeHandle);

private native void nativeClose(long nativeHandle);
Expand All @@ -54,22 +49,15 @@ public String getId() {
private native String nativeFetchMetrics(long nativeHandle);

@Override
public boolean hasNextInternal() throws IOException {
public boolean hasNextInternal() {
return nativeHasNext(handle);
}

@Override
public ColumnarBatch nextInternal() throws IOException {
public ColumnarBatch nextInternal() {
long block = nativeCHNext(handle);
CHNativeBlock nativeBlock = new CHNativeBlock(block);
int cols = nativeBlock.numColumns();
ColumnVector[] columnVectors = new ColumnVector[cols];
for (int i = 0; i < cols; i++) {
columnVectors[i] =
new CHColumnVector(
CHExecUtil.inferSparkDataType(nativeBlock.getTypeByPosition(i)), block, i);
}
return new ColumnarBatch(columnVectors, nativeBlock.numRows());
return nativeBlock.toColumnarBatch();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public class CHColumnVector extends ColumnVector {
private final int columnPosition;
private long blockAddress;
private final long blockAddress;

public CHColumnVector(DataType type, long blockAddress, int columnPosition) {
super(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,13 @@ public static void closeFromColumnarBatch(ColumnarBatch cb) {
}

public ColumnarBatch toColumnarBatch() {
ColumnVector[] vectors = new ColumnVector[numColumns()];
for (int i = 0; i < numColumns(); i++) {
int numRows = numRows();
int cols = numColumns();
ColumnVector[] vectors = new ColumnVector[cols];
for (int i = 0; i < cols; i++) {
vectors[i] =
new CHColumnVector(CHExecUtil.inferSparkDataType(getTypeByPosition(i)), blockAddress, i);
}
int numRows = 0;
if (numColumns() != 0) {
numRows = numRows();
}
return new ColumnarBatch(vectors, numRows);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.spark.SparkConf;
import org.apache.spark.sql.internal.SQLConf;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -36,15 +37,12 @@
import scala.Tuple2;
import scala.collection.JavaConverters;

public class CHNativeExpressionEvaluator {
private final ExpressionEvaluatorJniWrapper jniWrapper;
public class CHNativeExpressionEvaluator extends ExpressionEvaluatorJniWrapper {

public CHNativeExpressionEvaluator() {
jniWrapper = new ExpressionEvaluatorJniWrapper();
}
private CHNativeExpressionEvaluator() {}

// Used to initialize the native computing.
public void initNative(SparkConf conf) {
public static void initNative(SparkConf conf) {
Tuple2<String, String>[] all = conf.getAll();
Map<String, String> confMap =
Arrays.stream(all).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
Expand All @@ -55,19 +53,19 @@ public void initNative(SparkConf conf) {
// Get the customer config from SparkConf for each backend
BackendsApiManager.getTransformerApiInstance().postProcessNativeConfig(nativeConfMap, prefix);

jniWrapper.nativeInitNative(buildNativeConf(nativeConfMap));
nativeInitNative(buildNativeConf(nativeConfMap));
}

public void finalizeNative() {
jniWrapper.nativeFinalizeNative();
public static void finalizeNative() {
nativeFinalizeNative();
}

// Used to validate the Substrait plan in native compute engine.
public boolean doValidate(byte[] subPlan) {
return jniWrapper.nativeDoValidate(subPlan);
public static boolean doValidate(byte[] subPlan) {
throw new UnsupportedOperationException("doValidate is not supported in Clickhouse Backend");
}

private byte[] buildNativeConf(Map<String, String> confs) {
private static byte[] buildNativeConf(Map<String, String> confs) {
StringMapNode stringMapNode = ExpressionBuilder.makeStringMap(confs);
AdvancedExtensionNode extensionNode =
ExtensionBuilder.makeAdvancedExtension(
Expand All @@ -76,27 +74,28 @@ private byte[] buildNativeConf(Map<String, String> confs) {
return PlanBuilder.makePlan(extensionNode).toProtobuf().toByteArray();
}

private Map<String, String> getNativeBackendConf() {
private static Map<String, String> getNativeBackendConf() {
return GlutenConfig.getNativeBackendConf(
BackendsApiManager.getSettings().getBackendConfigPrefix(), SQLConf.get().getAllConfs());
}

public static void injectWriteFilesTempPath(String path, String fileName) {
throw new UnsupportedOperationException(
"injectWriteFilesTempPath Not supported in CHNativeExpressionEvaluator");
ExpressionEvaluatorJniWrapper.injectWriteFilesTempPath(
CHNativeMemoryAllocators.contextInstance().getNativeInstanceId(),
path.getBytes(StandardCharsets.UTF_8),
fileName.getBytes(StandardCharsets.UTF_8));
}

// Used by WholeStageTransform to create the native computing pipeline and
// return a columnar result iterator.
public BatchIterator createKernelWithBatchIterator(
public static BatchIterator createKernelWithBatchIterator(
byte[] wsPlan,
byte[][] splitInfo,
List<GeneralInIterator> iterList,
boolean materializeInput) {
long allocId = CHNativeMemoryAllocators.contextInstance().getNativeInstanceId();
long handle =
jniWrapper.nativeCreateKernelWithIterator(
allocId,
nativeCreateKernelWithIterator(
CHNativeMemoryAllocators.contextInstance().getNativeInstanceId(),
wsPlan,
splitInfo,
iterList.toArray(new GeneralInIterator[0]),
Expand All @@ -106,10 +105,10 @@ public BatchIterator createKernelWithBatchIterator(
}

// Only for UT.
public BatchIterator createKernelWithBatchIterator(
public static BatchIterator createKernelWithBatchIterator(
long allocId, byte[] wsPlan, byte[][] splitInfo, List<GeneralInIterator> iterList) {
long handle =
jniWrapper.nativeCreateKernelWithIterator(
nativeCreateKernelWithIterator(
allocId,
wsPlan,
splitInfo,
Expand All @@ -119,7 +118,7 @@ public BatchIterator createKernelWithBatchIterator(
return createBatchIterator(handle);
}

private BatchIterator createBatchIterator(long nativeHandle) {
private static BatchIterator createBatchIterator(long nativeHandle) {
return new BatchIterator(nativeHandle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,18 @@
public class ExpressionEvaluatorJniWrapper {

/** Call initNative to initialize native computing. */
native void nativeInitNative(byte[] confAsPlan);
static native void nativeInitNative(byte[] confAsPlan);

/** Call finalizeNative to finalize native computing. */
native void nativeFinalizeNative();

/**
* Validate the Substrait plan in native compute engine.
*
* @param subPlan the Substrait plan in binary format.
* @return whether the computing of this plan is supported in native.
*/
native boolean nativeDoValidate(byte[] subPlan);
static native void nativeFinalizeNative();

/**
* Create a native compute kernel and return a columnar result iterator.
*
* @param allocatorId allocator id
* @return iterator instance id
*/
public native long nativeCreateKernelWithIterator(
public static native long nativeCreateKernelWithIterator(
long allocatorId,
byte[] wsPlan,
byte[][] splitInfo,
Expand All @@ -52,9 +44,11 @@ public native long nativeCreateKernelWithIterator(
boolean materializeInput);

/**
* Closes the projector referenced by nativeHandler.
* Set the temp path for writing files.
*
* @param nativeHandler nativeHandler that needs to be closed
* @param allocatorId allocator id for current task attempt(or thread)
* @param path the temp path for writing files
*/
native void nativeClose(long nativeHandler);
public static native void injectWriteFilesTempPath(
long allocatorId, byte[] path, byte[] filename);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@ package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.{CH_BRANCH, CH_COMMIT, GlutenConfig}
import org.apache.gluten.backendsapi._
import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, StructField, StructType}

import java.util.Locale

Expand Down Expand Up @@ -187,6 +192,73 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}

override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
options: Map[String, String]): ValidationResult = {

def validateCompressionCodec(): Option[String] = {
// FIXME: verify Support compression codec
val compressionCodec = WriteFilesExecTransformer.getCompressionCodec(options)
None
}

def validateFileFormat(): Option[String] = {
format match {
case _: ParquetFileFormat => None
case _: OrcFileFormat => None
case f: FileFormat => Some(s"Not support FileFormat: ${f.getClass.getSimpleName}")
}
}

// Validate if all types are supported.
def validateDateTypes(): Option[String] = {
None
}

def validateFieldMetadata(): Option[String] = {
// copy CharVarcharUtils.CHAR_VARCHAR_TYPE_STRING_METADATA_KEY
val CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = "__CHAR_VARCHAR_TYPE_STRING"
fields
.find(_.metadata != Metadata.empty)
.filterNot(_.metadata.contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY))
.map {
filed =>
s"StructField contain the metadata information: $filed, metadata: ${filed.metadata}"
}
}
def validateWriteFilesOptions(): Option[String] = {
val maxRecordsPerFile = options
.get("maxRecordsPerFile")
.map(_.toLong)
.getOrElse(SQLConf.get.maxRecordsPerFile)
if (maxRecordsPerFile > 0) {
Some("Unsupported native write: maxRecordsPerFile not supported.")
} else {
None
}
}

def validateBucketSpec(): Option[String] = {
if (bucketSpec.nonEmpty) {
Some("Unsupported native write: bucket write is not supported.")
} else {
None
}
}

validateCompressionCodec()
.orElse(validateFileFormat())
.orElse(validateFieldMetadata())
.orElse(validateDateTypes())
.orElse(validateWriteFilesOptions())
.orElse(validateBucketSpec()) match {
case Some(reason) => ValidationResult.failed(reason)
case _ => ValidationResult.succeeded
}
}

override def supportShuffleWithProject(
outputPartitioning: Partitioning,
child: SparkPlan): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
}
.map(it => new ColumnarNativeIterator(it.asJava).asInstanceOf[GeneralInIterator])
.asJava
new CHNativeExpressionEvaluator().createKernelWithBatchIterator(
CHNativeExpressionEvaluator.createKernelWithBatchIterator(
wsPlan,
splitInfoByteArray,
listIterator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class CHListenerApi extends ListenerApi with Logging {
val externalSortKey = s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings" +
s".max_bytes_before_external_sort"
if (conf.getLong(externalSortKey, -1) < 0) {
if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
if (conf.getBoolean("spark.memory.offHeap.enabled", defaultValue = false)) {
val memSize = JavaUtils.byteStringAsBytes(conf.get("spark.memory.offHeap.size"))
if (memSize > 0L) {
val cores = conf.getInt("spark.executor.cores", 1).toLong
Expand All @@ -97,8 +97,7 @@ class CHListenerApi extends ListenerApi with Logging {
// Load supported hive/python/scala udfs
UDFMappings.loadFromSparkConf(conf)

val initKernel = new CHNativeExpressionEvaluator()
initKernel.initNative(conf)
CHNativeExpressionEvaluator.initNative(conf)

// inject backend-specific implementations to override spark classes
// FIXME: The following set instances twice in local mode?
Expand All @@ -110,7 +109,6 @@ class CHListenerApi extends ListenerApi with Logging {

private def shutdown(): Unit = {
CHBroadcastBuildSideCache.cleanAll()
val kernel = new CHNativeExpressionEvaluator()
kernel.finalizeNative()
CHNativeExpressionEvaluator.finalizeNative()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,13 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
s"SampleTransformer metrics update is not supported in CH backend")
}

def genWriteFilesTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = {
throw new UnsupportedOperationException(
s"WriteFilesTransformer metrics update is not supported in CH backend")
}
def genWriteFilesTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"physicalWrittenBytes" -> SQLMetrics.createMetric(sparkContext, "number of written bytes"),
"numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files")
)

def genWriteFilesTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = {
throw new UnsupportedOperationException(
s"WriteFilesTransformer metrics update is not supported in CH backend")
new WriteFilesMetricsUpdater(metrics)
}
}
Loading

0 comments on commit d90a7f4

Please sign in to comment.