Skip to content

Commit

Permalink
Support reading compressed Arrow IPC files
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve committed Sep 13, 2023
1 parent bb526f6 commit 5200852
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 5 deletions.
1 change: 1 addition & 0 deletions datafusion-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
api 'org.apache.arrow:arrow-vector:13.0.0'
implementation 'org.apache.arrow:arrow-c-data:13.0.0'
runtimeOnly 'org.apache.arrow:arrow-memory-unsafe:13.0.0'
testImplementation 'org.apache.arrow:arrow-compression:13.0.0'
testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1'
testImplementation 'org.apache.hadoop:hadoop-client:3.3.5'
testImplementation 'org.apache.hadoop:hadoop-common:3.3.5'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.arrow.compression.CommonsCompressionFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.compression.CompressionCodec;
import org.apache.arrow.vector.compression.CompressionUtil;
import org.apache.arrow.vector.compression.NoCompressionCodec;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.types.pojo.Field;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -97,7 +102,7 @@ public void testArrowListingTable(@TempDir Path tempDir) throws Exception {
}
xVector.setValueCount(2);
yVector.setValueCount(2);
writeArrowFile(arrowFilePath0, vectors);
writeArrowFile(arrowFilePath0, vectors, false);

xVector.reset();
yVector.reset();
Expand All @@ -107,7 +112,7 @@ public void testArrowListingTable(@TempDir Path tempDir) throws Exception {
}
xVector.setValueCount(2);
yVector.setValueCount(2);
writeArrowFile(arrowFilePath1, vectors);
writeArrowFile(arrowFilePath1, vectors, false);
}

try (ArrowFormat format = new ArrowFormat();
Expand All @@ -125,6 +130,57 @@ public void testArrowListingTable(@TempDir Path tempDir) throws Exception {
}
}

@Test
public void testCompressedArrowIpc(@TempDir Path tempDir) throws Exception {
try (SessionContext context = SessionContexts.create();
BufferAllocator allocator = new RootAllocator()) {
Path dataDir = tempDir.resolve("data");
Files.createDirectories(dataDir);
Path arrowFilePath0 = dataDir.resolve("0.arrow");

// Data needs to be reasonably large otherwise compression is not used
int numRows = 10_000;

// Write data files in compressed Arrow IPC (Feather V2) file format
try (BigIntVector xVector = new BigIntVector("x", allocator)) {
for (int i = 0; i < numRows; i++) {
xVector.setSafe(i, i * 2 + 1);
}
xVector.setValueCount(numRows);
List<FieldVector> vectors = Arrays.asList(xVector);
writeArrowFile(arrowFilePath0, vectors, true);
}

try (ArrowFormat format = new ArrowFormat();
ListingOptions listingOptions =
ListingOptions.builder(format).withFileExtension(".arrow").build();
ListingTableConfig tableConfig =
ListingTableConfig.builder(dataDir)
.withListingOptions(listingOptions)
.build(context)
.join();
ListingTable listingTable = new ListingTable(tableConfig)) {
context.registerTable("test", listingTable);
try (ArrowReader reader =
context
.sql("SELECT x FROM test")
.thenComposeAsync(df -> df.collect(allocator))
.join()) {

int globalRow = 0;
VectorSchemaRoot root = reader.getVectorSchemaRoot();
while (reader.loadNextBatch()) {
BigIntVector xValues = (BigIntVector) root.getVector(0);
for (int row = 0; row < root.getRowCount(); ++row, ++globalRow) {
assertEquals(globalRow * 2 + 1, xValues.get(row));
}
}
assertEquals(numRows, globalRow);
}
}
}
}

@Test
public void testDisableCollectStat(@TempDir Path tempDir) throws Exception {
try (SessionContext context = SessionContexts.create();
Expand Down Expand Up @@ -204,11 +260,24 @@ private static Path[] writeParquetFiles(Path dataDir) throws Exception {
return new Path[] {parquetFilePath0, parquetFilePath1};
}

private static void writeArrowFile(Path filePath, List<FieldVector> vectors) throws Exception {
private static void writeArrowFile(Path filePath, List<FieldVector> vectors, boolean compressed)
throws Exception {
List<Field> fields = vectors.stream().map(v -> v.getField()).collect(Collectors.toList());
CompressionUtil.CodecType codec =
compressed ? CompressionUtil.CodecType.ZSTD : CompressionUtil.CodecType.NO_COMPRESSION;
CompressionCodec.Factory compressionFactory =
compressed ? new CommonsCompressionFactory() : NoCompressionCodec.Factory.INSTANCE;
try (VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors);
FileOutputStream output = new FileOutputStream(filePath.toString());
ArrowFileWriter writer = new ArrowFileWriter(root, null, output.getChannel())) {
ArrowFileWriter writer =
new ArrowFileWriter(
root,
null,
output.getChannel(),
null,
IpcOption.DEFAULT,
compressionFactory,
codec)) {
writer.start();
writer.writeBatch();
writer.end();
Expand Down
2 changes: 1 addition & 1 deletion datafusion-jni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ edition = "2021"
[dependencies]
jni = "^0.21.0"
tokio = "^1.32.0"
arrow = { version = "^39.0", features = ["ffi"] }
arrow = { version = "^39.0", features = ["ffi", "ipc_compression"] }
datafusion = "^25.0"
futures = "0.3.28"

Expand Down

0 comments on commit 5200852

Please sign in to comment.