Skip to content

Commit

Permalink
Refactor: reduce duplicated codes
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Jul 8, 2024
1 parent b9d93de commit b323c4d
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
import org.apache.gluten.metrics.IMetrics;
import org.apache.gluten.metrics.NativeMetrics;

import org.apache.spark.sql.execution.utils.CHExecUtil;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class BatchIterator extends GeneralOutIterator {
Expand All @@ -43,8 +40,6 @@ public String getId() {

private native boolean nativeHasNext(long nativeHandle);

private native byte[] nativeNext(long nativeHandle);

private native long nativeCHNext(long nativeHandle);

private native void nativeClose(long nativeHandle);
Expand All @@ -54,22 +49,15 @@ public String getId() {
private native String nativeFetchMetrics(long nativeHandle);

@Override
public boolean hasNextInternal() throws IOException {
public boolean hasNextInternal() {
return nativeHasNext(handle);
}

@Override
public ColumnarBatch nextInternal() throws IOException {
public ColumnarBatch nextInternal() {
long block = nativeCHNext(handle);
CHNativeBlock nativeBlock = new CHNativeBlock(block);
int cols = nativeBlock.numColumns();
ColumnVector[] columnVectors = new ColumnVector[cols];
for (int i = 0; i < cols; i++) {
columnVectors[i] =
new CHColumnVector(
CHExecUtil.inferSparkDataType(nativeBlock.getTypeByPosition(i)), block, i);
}
return new ColumnarBatch(columnVectors, nativeBlock.numRows());
return nativeBlock.toColumnarBatch();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public class CHColumnVector extends ColumnVector {
private final int columnPosition;
private long blockAddress;
private final long blockAddress;

public CHColumnVector(DataType type, long blockAddress, int columnPosition) {
super(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,13 @@ public static void closeFromColumnarBatch(ColumnarBatch cb) {
}

public ColumnarBatch toColumnarBatch() {
ColumnVector[] vectors = new ColumnVector[numColumns()];
for (int i = 0; i < numColumns(); i++) {
int numRows = numRows();
int cols = numColumns();
ColumnVector[] vectors = new ColumnVector[cols];
for (int i = 0; i < cols; i++) {
vectors[i] =
new CHColumnVector(CHExecUtil.inferSparkDataType(getTypeByPosition(i)), blockAddress, i);
}
int numRows = 0;
if (numColumns() != 0) {
numRows = numRows();
}
return new ColumnarBatch(vectors, numRows);
}
}

0 comments on commit b323c4d

Please sign in to comment.