From 4a996783697faa1c0bd6b4fc6ceb97260f6aeb7a Mon Sep 17 00:00:00 2001 From: Manu Zhang Date: Mon, 8 Jan 2024 18:08:07 +0800 Subject: [PATCH] Parquet: Support reading INT96 column in row group filter (#8988) --- .../iceberg/parquet/ParquetConversions.java | 5 +++ .../ParquetDictionaryRowGroupFilter.java | 3 ++ .../source/TestIcebergSourceTablesBase.java | 32 ++++++++++++------- .../source/TestIcebergSourceTablesBase.java | 32 ++++++++++++------- .../source/TestIcebergSourceTablesBase.java | 28 ++++++++++------ 5 files changed, 69 insertions(+), 31 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java index e1a342b63261..0f9878d2019d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.function.Function; @@ -112,6 +113,10 @@ static Function converterFromParquet(PrimitiveType type) { case FIXED_LEN_BYTE_ARRAY: case BINARY: return binary -> ByteBuffer.wrap(((Binary) binary).getBytes()); + case INT96: + return binary -> + ParquetUtil.extractTimestampInt96( + ByteBuffer.wrap(((Binary) binary).getBytes()).order(ByteOrder.LITTLE_ENDIAN)); default: } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java index 33ec2f6817e0..1d24b7ccd71f 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java @@ -453,6 +453,9 @@ private Set dict(int id, Comparator comparator) { case DOUBLE: dictSet.add((T) conversion.apply(dict.decodeToDouble(i))); break; + case INT96: + dictSet.add((T) conversion.apply(dict.decodeToBinary(i))); + break; default: throw new IllegalArgumentException( "Cannot decode dictionary of type: " diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 3c52652748ef..111da882fe8a 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -2156,22 +2156,32 @@ public void testTableWithInt96Timestamp() throws IOException { stagingLocation); // validate we get the expected results back - List expected = spark.table("parquet_table").select("tmp_col").collectAsList(); - List actual = - spark - .read() - .format("iceberg") - .load(loadLocation(tableIdentifier)) - .select("tmp_col") - .collectAsList(); - Assertions.assertThat(actual) - .as("Rows must match") - .containsExactlyInAnyOrderElementsOf(expected); + testWithFilter("tmp_col < to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col <= to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col == to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col > to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col >= to_timestamp('2000-01-31 08:30:00')", tableIdentifier); dropTable(tableIdentifier); } } } + private void testWithFilter(String filterExpr, TableIdentifier tableIdentifier) { + List expected = + spark.table("parquet_table").select("tmp_col").filter(filterExpr).collectAsList(); + List actual = + spark + .read() + .format("iceberg") + .load(loadLocation(tableIdentifier)) + .select("tmp_col") + .filter(filterExpr) + .collectAsList(); + Assertions.assertThat(actual) + .as("Rows must match") + .containsExactlyInAnyOrderElementsOf(expected); + } + private GenericData.Record manifestRecord( Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) { GenericRecordBuilder builder = diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index ebd0933a9566..b5038f981bae 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -2154,22 +2154,32 @@ public void testTableWithInt96Timestamp() throws IOException { stagingLocation); // validate we get the expected results back - List expected = spark.table("parquet_table").select("tmp_col").collectAsList(); - List actual = - spark - .read() - .format("iceberg") - .load(loadLocation(tableIdentifier)) - .select("tmp_col") - .collectAsList(); - Assertions.assertThat(actual) - .as("Rows must match") - .containsExactlyInAnyOrderElementsOf(expected); + testWithFilter("tmp_col < to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col <= to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col == to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col > to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col >= to_timestamp('2000-01-31 08:30:00')", tableIdentifier); dropTable(tableIdentifier); } } } + private void testWithFilter(String filterExpr, TableIdentifier tableIdentifier) { + List expected = + spark.table("parquet_table").select("tmp_col").filter(filterExpr).collectAsList(); + List actual = + spark + .read() + .format("iceberg") + .load(loadLocation(tableIdentifier)) + .select("tmp_col") + .filter(filterExpr) + .collectAsList(); + Assertions.assertThat(actual) + .as("Rows must match") + .containsExactlyInAnyOrderElementsOf(expected); + } + private GenericData.Record manifestRecord( Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) { GenericRecordBuilder builder = diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 4f585eee51f4..d37d6a861690 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -2181,20 +2181,30 @@ public void testTableWithInt96Timestamp() throws IOException { stagingLocation); // validate we get the expected results back - List expected = spark.table("parquet_table").select("tmp_col").collectAsList(); - List actual = - spark - .read() - .format("iceberg") - .load(loadLocation(tableIdentifier)) - .select("tmp_col") - .collectAsList(); - assertThat(actual).as("Rows must match").containsExactlyInAnyOrderElementsOf(expected); + testWithFilter("tmp_col < to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col <= to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col == to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col > to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col >= to_timestamp('2000-01-31 08:30:00')", tableIdentifier); dropTable(tableIdentifier); } } } + private void testWithFilter(String filterExpr, TableIdentifier tableIdentifier) { + List expected = + spark.table("parquet_table").select("tmp_col").filter(filterExpr).collectAsList(); + List actual = + spark + .read() + .format("iceberg") + .load(loadLocation(tableIdentifier)) + .select("tmp_col") + .filter(filterExpr) + .collectAsList(); + assertThat(actual).as("Rows must match").containsExactlyInAnyOrderElementsOf(expected); + } + private GenericData.Record manifestRecord( Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) { GenericRecordBuilder builder =