From 1788818501b0c2076d799b2a276fd2c654dd8099 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Wed, 13 Sep 2023 15:57:22 +1200 Subject: [PATCH 1/3] Upgrade to datafusion 25.0 --- datafusion-jni/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion-jni/Cargo.toml b/datafusion-jni/Cargo.toml index 07c3e9d..04bd5fd 100644 --- a/datafusion-jni/Cargo.toml +++ b/datafusion-jni/Cargo.toml @@ -12,8 +12,8 @@ edition = "2021" [dependencies] jni = "^0.21.0" tokio = "^1.32.0" -arrow = { version = "^36.0", features = ["ffi"] } -datafusion = "^22.0" +arrow = { version = "^39.0", features = ["ffi"] } +datafusion = "^25.0" futures = "0.3.28" [lib] From bb526f68ebd883faa8aebcee189400dd59a7ed03 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Tue, 30 May 2023 10:36:48 +0100 Subject: [PATCH 2/3] Add support for the Arrow file format --- .../apache/arrow/datafusion/ArrowFormat.java | 14 ++++ .../apache/arrow/datafusion/FileFormats.java | 2 + .../arrow/datafusion/TestListingTable.java | 65 +++++++++++++++++++ datafusion-jni/src/file_formats.rs | 10 +++ 4 files changed, 91 insertions(+) create mode 100644 datafusion-java/src/main/java/org/apache/arrow/datafusion/ArrowFormat.java diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/ArrowFormat.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/ArrowFormat.java new file mode 100644 index 0000000..56f3f94 --- /dev/null +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/ArrowFormat.java @@ -0,0 +1,14 @@ +package org.apache.arrow.datafusion; + +/** The Apache Arrow IPC file format configuration. This format is also known as Feather V2 */ +public class ArrowFormat extends AbstractProxy implements FileFormat { + /** Create a new ArrowFormat with default options */ + public ArrowFormat() { + super(FileFormats.createArrow()); + } + + @Override + void doClose(long pointer) { + FileFormats.destroyFileFormat(pointer); + } +} diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/FileFormats.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/FileFormats.java index 0712813..3efd9e1 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/FileFormats.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/FileFormats.java @@ -4,6 +4,8 @@ class FileFormats { private FileFormats() {} + static native long createArrow(); + static native long createCsv(); static native long createParquet(); diff --git a/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestListingTable.java b/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestListingTable.java index d62ed1d..51c1d3a 100644 --- a/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestListingTable.java +++ b/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestListingTable.java @@ -2,6 +2,7 @@ import static org.junit.jupiter.api.Assertions.*; +import java.io.FileOutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -9,12 +10,16 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileWriter; import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.Field; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -71,6 +76,55 @@ public void testParquetListingTable(@TempDir Path tempDir) throws Exception { } } + @Test + public void testArrowListingTable(@TempDir Path tempDir) throws Exception { + try (SessionContext context = SessionContexts.create(); + BufferAllocator allocator = new RootAllocator()) { + Path dataDir = tempDir.resolve("data"); + Files.createDirectories(dataDir); + + Path arrowFilePath0 = dataDir.resolve("0.arrow"); + Path arrowFilePath1 = dataDir.resolve("1.arrow"); + + // Write data files in Arrow IPC (Feather V2) file format + try (BigIntVector xVector = new BigIntVector("x", allocator); + BigIntVector yVector = new BigIntVector("y", allocator)) { + List vectors = Arrays.asList(xVector, yVector); + + for (int i = 0; i < 2; i++) { + xVector.setSafe(i, i * 2 + 1); + yVector.setSafe(i, i * 2 + 2); + } + xVector.setValueCount(2); + yVector.setValueCount(2); + writeArrowFile(arrowFilePath0, vectors); + + xVector.reset(); + yVector.reset(); + for (int i = 0; i < 2; i++) { + xVector.setSafe(i, i * 2 + 1); + yVector.setSafe(i, i * 2 + 12); + } + xVector.setValueCount(2); + yVector.setValueCount(2); + writeArrowFile(arrowFilePath1, vectors); + } + + try (ArrowFormat format = new ArrowFormat(); + ListingOptions listingOptions = + ListingOptions.builder(format).withFileExtension(".arrow").build(); + ListingTableConfig tableConfig = + ListingTableConfig.builder(dataDir) + .withListingOptions(listingOptions) + .build(context) + .join(); + ListingTable listingTable = new ListingTable(tableConfig)) { + context.registerTable("test", listingTable); + testQuery(context, allocator); + } + } + } + @Test public void testDisableCollectStat(@TempDir Path tempDir) throws Exception { try (SessionContext context = SessionContexts.create(); @@ -150,6 +204,17 @@ private static Path[] writeParquetFiles(Path dataDir) throws Exception { return new Path[] {parquetFilePath0, parquetFilePath1}; } + private static void writeArrowFile(Path filePath, List vectors) throws Exception { + List fields = vectors.stream().map(v -> v.getField()).collect(Collectors.toList()); + try (VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors); + FileOutputStream output = new FileOutputStream(filePath.toString()); + ArrowFileWriter writer = new ArrowFileWriter(root, null, output.getChannel())) { + writer.start(); + writer.writeBatch(); + writer.end(); + } + } + private static void testQuery(SessionContext context, BufferAllocator allocator) throws Exception { try (ArrowReader reader = diff --git a/datafusion-jni/src/file_formats.rs b/datafusion-jni/src/file_formats.rs index 96b73f4..6f6f166 100644 --- a/datafusion-jni/src/file_formats.rs +++ b/datafusion-jni/src/file_formats.rs @@ -1,3 +1,4 @@ +use datafusion::datasource::file_format::arrow::ArrowFormat; use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; @@ -28,6 +29,15 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_FileFormats_createParque Box::into_raw(Box::new(format)) as jlong } +#[no_mangle] +pub extern "system" fn Java_org_apache_arrow_datafusion_FileFormats_createArrow( + _env: JNIEnv, + _class: JClass, +) -> jlong { + let format: Arc = Arc::new(ArrowFormat::default()); + Box::into_raw(Box::new(format)) as jlong +} + #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_FileFormats_destroyFileFormat( _env: JNIEnv, From 5200852ae0a3da8b0fb042f3aacb216213bed2bd Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Tue, 11 Jul 2023 22:11:21 +1200 Subject: [PATCH 3/3] Support reading compressed Arrow IPC files --- datafusion-java/build.gradle | 1 + .../arrow/datafusion/TestListingTable.java | 77 ++++++++++++++++++- datafusion-jni/Cargo.toml | 2 +- 3 files changed, 75 insertions(+), 5 deletions(-) diff --git a/datafusion-java/build.gradle b/datafusion-java/build.gradle index 819c502..9492df5 100644 --- a/datafusion-java/build.gradle +++ b/datafusion-java/build.gradle @@ -13,6 +13,7 @@ dependencies { api 'org.apache.arrow:arrow-vector:13.0.0' implementation 'org.apache.arrow:arrow-c-data:13.0.0' runtimeOnly 'org.apache.arrow:arrow-memory-unsafe:13.0.0' + testImplementation 'org.apache.arrow:arrow-compression:13.0.0' testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1' testImplementation 'org.apache.hadoop:hadoop-client:3.3.5' testImplementation 'org.apache.hadoop:hadoop-common:3.3.5' diff --git a/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestListingTable.java b/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestListingTable.java index 51c1d3a..3a0e4ff 100644 --- a/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestListingTable.java +++ b/datafusion-java/src/test/java/org/apache/arrow/datafusion/TestListingTable.java @@ -11,14 +11,19 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import org.apache.arrow.compression.CommonsCompressionFactory; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.ArrowFileWriter; import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.message.IpcOption; import org.apache.arrow.vector.types.pojo.Field; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -97,7 +102,7 @@ public void testArrowListingTable(@TempDir Path tempDir) throws Exception { } xVector.setValueCount(2); yVector.setValueCount(2); - writeArrowFile(arrowFilePath0, vectors); + writeArrowFile(arrowFilePath0, vectors, false); xVector.reset(); yVector.reset(); @@ -107,7 +112,7 @@ public void testArrowListingTable(@TempDir Path tempDir) throws Exception { } xVector.setValueCount(2); yVector.setValueCount(2); - writeArrowFile(arrowFilePath1, vectors); + writeArrowFile(arrowFilePath1, vectors, false); } try (ArrowFormat format = new ArrowFormat(); @@ -125,6 +130,57 @@ public void testArrowListingTable(@TempDir Path tempDir) throws Exception { } } + @Test + public void testCompressedArrowIpc(@TempDir Path tempDir) throws Exception { + try (SessionContext context = SessionContexts.create(); + BufferAllocator allocator = new RootAllocator()) { + Path dataDir = tempDir.resolve("data"); + Files.createDirectories(dataDir); + Path arrowFilePath0 = dataDir.resolve("0.arrow"); + + // Data needs to be reasonably large otherwise compression is not used + int numRows = 10_000; + + // Write data files in compressed Arrow IPC (Feather V2) file format + try (BigIntVector xVector = new BigIntVector("x", allocator)) { + for (int i = 0; i < numRows; i++) { + xVector.setSafe(i, i * 2 + 1); + } + xVector.setValueCount(numRows); + List vectors = Arrays.asList(xVector); + writeArrowFile(arrowFilePath0, vectors, true); + } + + try (ArrowFormat format = new ArrowFormat(); + ListingOptions listingOptions = + ListingOptions.builder(format).withFileExtension(".arrow").build(); + ListingTableConfig tableConfig = + ListingTableConfig.builder(dataDir) + .withListingOptions(listingOptions) + .build(context) + .join(); + ListingTable listingTable = new ListingTable(tableConfig)) { + context.registerTable("test", listingTable); + try (ArrowReader reader = + context + .sql("SELECT x FROM test") + .thenComposeAsync(df -> df.collect(allocator)) + .join()) { + + int globalRow = 0; + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + while (reader.loadNextBatch()) { + BigIntVector xValues = (BigIntVector) root.getVector(0); + for (int row = 0; row < root.getRowCount(); ++row, ++globalRow) { + assertEquals(globalRow * 2 + 1, xValues.get(row)); + } + } + assertEquals(numRows, globalRow); + } + } + } + } + @Test public void testDisableCollectStat(@TempDir Path tempDir) throws Exception { try (SessionContext context = SessionContexts.create(); @@ -204,11 +260,24 @@ private static Path[] writeParquetFiles(Path dataDir) throws Exception { return new Path[] {parquetFilePath0, parquetFilePath1}; } - private static void writeArrowFile(Path filePath, List vectors) throws Exception { + private static void writeArrowFile(Path filePath, List vectors, boolean compressed) + throws Exception { List fields = vectors.stream().map(v -> v.getField()).collect(Collectors.toList()); + CompressionUtil.CodecType codec = + compressed ? CompressionUtil.CodecType.ZSTD : CompressionUtil.CodecType.NO_COMPRESSION; + CompressionCodec.Factory compressionFactory = + compressed ? new CommonsCompressionFactory() : NoCompressionCodec.Factory.INSTANCE; try (VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors); FileOutputStream output = new FileOutputStream(filePath.toString()); - ArrowFileWriter writer = new ArrowFileWriter(root, null, output.getChannel())) { + ArrowFileWriter writer = + new ArrowFileWriter( + root, + null, + output.getChannel(), + null, + IpcOption.DEFAULT, + compressionFactory, + codec)) { writer.start(); writer.writeBatch(); writer.end(); diff --git a/datafusion-jni/Cargo.toml b/datafusion-jni/Cargo.toml index 04bd5fd..45cc4e8 100644 --- a/datafusion-jni/Cargo.toml +++ b/datafusion-jni/Cargo.toml @@ -12,7 +12,7 @@ edition = "2021" [dependencies] jni = "^0.21.0" tokio = "^1.32.0" -arrow = { version = "^39.0", features = ["ffi"] } +arrow = { version = "^39.0", features = ["ffi", "ipc_compression"] } datafusion = "^25.0" futures = "0.3.28"