From 5200852ae0a3da8b0fb042f3aacb216213bed2bd Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Tue, 11 Jul 2023 22:11:21 +1200 Subject: [PATCH] Support reading compressed Arrow IPC files --- datafusion-java/build.gradle | 1 + .../arrow/datafusion/TestListingTable.java | 77 ++++++++++++++++++- datafusion-jni/Cargo.toml | 2 +- 3 files changed, 75 insertions(+), 5 deletions(-) diff --git a/datafusion-java/build.gradle b/datafusion-java/build.gradle index 819c502..9492df5 100644 --- a/datafusion-java/build.gradle +++ b/datafusion-java/build.gradle @@ -13,6 +13,7 @@ dependencies { api 'org.apache.arrow:arrow-vector:13.0.0' implementation 'org.apache.arrow:arrow-c-data:13.0.0' runtimeOnly 'org.apache.arrow:arrow-memory-unsafe:13.0.0' + testImplementation 'org.apache.arrow:arrow-compression:13.0.0' testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1' testImplementation 'org.apache.hadoop:hadoop-client:3.3.5' testImplementation 'org.apache.hadoop:hadoop-common:3.3.5' diff --git a/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestListingTable.java b/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestListingTable.java index 51c1d3a..3a0e4ff 100644 --- a/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestListingTable.java +++ b/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestListingTable.java @@ -11,14 +11,19 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import org.apache.arrow.compression.CommonsCompressionFactory; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.ArrowFileWriter; import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.message.IpcOption; import org.apache.arrow.vector.types.pojo.Field; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -97,7 +102,7 @@ public void testArrowListingTable(@TempDir Path tempDir) throws Exception { } xVector.setValueCount(2); yVector.setValueCount(2); - writeArrowFile(arrowFilePath0, vectors); + writeArrowFile(arrowFilePath0, vectors, false); xVector.reset(); yVector.reset(); @@ -107,7 +112,7 @@ public void testArrowListingTable(@TempDir Path tempDir) throws Exception { } xVector.setValueCount(2); yVector.setValueCount(2); - writeArrowFile(arrowFilePath1, vectors); + writeArrowFile(arrowFilePath1, vectors, false); } try (ArrowFormat format = new ArrowFormat(); @@ -125,6 +130,57 @@ public void testArrowListingTable(@TempDir Path tempDir) throws Exception { } } + @Test + public void testCompressedArrowIpc(@TempDir Path tempDir) throws Exception { + try (SessionContext context = SessionContexts.create(); + BufferAllocator allocator = new RootAllocator()) { + Path dataDir = tempDir.resolve("data"); + Files.createDirectories(dataDir); + Path arrowFilePath0 = dataDir.resolve("0.arrow"); + + // Data needs to be reasonably large otherwise compression is not used + int numRows = 10_000; + + // Write data files in compressed Arrow IPC (Feather V2) file format + try (BigIntVector xVector = new BigIntVector("x", allocator)) { + for (int i = 0; i < numRows; i++) { + xVector.setSafe(i, i * 2 + 1); + } + xVector.setValueCount(numRows); + List vectors = Arrays.asList(xVector); + writeArrowFile(arrowFilePath0, vectors, true); + } + + try (ArrowFormat format = new ArrowFormat(); + ListingOptions listingOptions = + ListingOptions.builder(format).withFileExtension(".arrow").build(); + ListingTableConfig tableConfig = + ListingTableConfig.builder(dataDir) + .withListingOptions(listingOptions) + .build(context) + .join(); + ListingTable listingTable = new ListingTable(tableConfig)) { + context.registerTable("test", listingTable); + try (ArrowReader reader = + context + .sql("SELECT x FROM test") + .thenComposeAsync(df -> df.collect(allocator)) + .join()) { + + int globalRow = 0; + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + while (reader.loadNextBatch()) { + BigIntVector xValues = (BigIntVector) root.getVector(0); + for (int row = 0; row < root.getRowCount(); ++row, ++globalRow) { + assertEquals(globalRow * 2 + 1, xValues.get(row)); + } + } + assertEquals(numRows, globalRow); + } + } + } + } + @Test public void testDisableCollectStat(@TempDir Path tempDir) throws Exception { try (SessionContext context = SessionContexts.create(); @@ -204,11 +260,24 @@ private static Path[] writeParquetFiles(Path dataDir) throws Exception { return new Path[] {parquetFilePath0, parquetFilePath1}; } - private static void writeArrowFile(Path filePath, List vectors) throws Exception { + private static void writeArrowFile(Path filePath, List vectors, boolean compressed) + throws Exception { List fields = vectors.stream().map(v -> v.getField()).collect(Collectors.toList()); + CompressionUtil.CodecType codec = + compressed ? CompressionUtil.CodecType.ZSTD : CompressionUtil.CodecType.NO_COMPRESSION; + CompressionCodec.Factory compressionFactory = + compressed ? new CommonsCompressionFactory() : NoCompressionCodec.Factory.INSTANCE; try (VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors); FileOutputStream output = new FileOutputStream(filePath.toString()); - ArrowFileWriter writer = new ArrowFileWriter(root, null, output.getChannel())) { + ArrowFileWriter writer = + new ArrowFileWriter( + root, + null, + output.getChannel(), + null, + IpcOption.DEFAULT, + compressionFactory, + codec)) { writer.start(); writer.writeBatch(); writer.end(); diff --git a/datafusion-jni/Cargo.toml b/datafusion-jni/Cargo.toml index 04bd5fd..45cc4e8 100644 --- a/datafusion-jni/Cargo.toml +++ b/datafusion-jni/Cargo.toml @@ -12,7 +12,7 @@ edition = "2021" [dependencies] jni = "^0.21.0" tokio = "^1.32.0" -arrow = { version = "^39.0", features = ["ffi"] } +arrow = { version = "^39.0", features = ["ffi", "ipc_compression"] } datafusion = "^25.0" futures = "0.3.28"