diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index a3a59318320b..9cb14c5feb80 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -55,6 +55,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -95,6 +96,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.PropertyUtil; @@ -115,8 +117,12 @@ import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Parquet { + private static final Logger LOG = LoggerFactory.getLogger(Parquet.class); + private Parquet() {} private static final Collection READ_PROPERTIES_TO_REMOVE = @@ -266,6 +272,47 @@ private WriteBuilder createContextFunc( return this; } + private void setBloomFilterConfig( + Context context, + MessageType parquetSchema, + BiConsumer withBloomFilterEnabled, + BiConsumer withBloomFilterFPP) { + + Map fieldIdToParquetPath = + parquetSchema.getColumns().stream() + .collect( + Collectors.toMap( + col -> col.getPrimitiveType().getId().intValue(), + col -> String.join(".", col.getPath()))); + + context + .columnBloomFilterEnabled() + .forEach( + (colPath, isEnabled) -> { + Types.NestedField field = schema.findField(colPath); + if (field == null) { + LOG.warn("Skipping bloom filter config for missing field: {}", colPath); + return; + } + + int fieldId = field.fieldId(); + String parquetColumnPath = fieldIdToParquetPath.get(fieldId); + if (parquetColumnPath == null) { + LOG.warn( + "Skipping bloom filter config for field: {} due to missing parquetColumnPath for fieldId: {}", + colPath, + fieldId); + return; + } + + withBloomFilterEnabled.accept(parquetColumnPath, Boolean.valueOf(isEnabled)); + String fpp = context.columnBloomFilterFpp().get(colPath); + if (fpp != null) { + withBloomFilterFPP.accept(parquetColumnPath, Double.parseDouble(fpp)); + } + }); + } + public FileAppender build() throws IOException { Preconditions.checkNotNull(schema, "Schema is required"); Preconditions.checkNotNull(name, "Table name is required and cannot be null"); @@ -285,8 +332,6 @@ public FileAppender build() throws IOException { int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount(); int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount(); int bloomFilterMaxBytes = context.bloomFilterMaxBytes(); - Map columnBloomFilterFpp = context.columnBloomFilterFpp(); - Map columnBloomFilterEnabled = context.columnBloomFilterEnabled(); boolean dictionaryEnabled = context.dictionaryEnabled(); if (compressionLevel != null) { @@ -343,17 +388,8 @@ public FileAppender build() throws IOException { .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount) .withMaxBloomFilterBytes(bloomFilterMaxBytes); - for (Map.Entry entry : columnBloomFilterEnabled.entrySet()) { - String colPath = entry.getKey(); - String bloomEnabled = entry.getValue(); - propsBuilder.withBloomFilterEnabled(colPath, Boolean.parseBoolean(bloomEnabled)); - } - - for (Map.Entry entry : columnBloomFilterFpp.entrySet()) { - String colPath = entry.getKey(); - String fpp = entry.getValue(); - propsBuilder.withBloomFilterFPP(colPath, Double.parseDouble(fpp)); - } + setBloomFilterConfig( + context, type, propsBuilder::withBloomFilterEnabled, propsBuilder::withBloomFilterFPP); ParquetProperties parquetProperties = propsBuilder.build(); @@ -386,17 +422,11 @@ public FileAppender build() throws IOException { .withDictionaryPageSize(dictionaryPageSize) .withEncryption(fileEncryptionProperties); - for (Map.Entry entry : columnBloomFilterEnabled.entrySet()) { - String colPath = entry.getKey(); - String bloomEnabled = entry.getValue(); - parquetWriteBuilder.withBloomFilterEnabled(colPath, Boolean.parseBoolean(bloomEnabled)); - } - - for (Map.Entry entry : columnBloomFilterFpp.entrySet()) { - String colPath = entry.getKey(); - String fpp = entry.getValue(); - parquetWriteBuilder.withBloomFilterFPP(colPath, Double.parseDouble(fpp)); - } + setBloomFilterConfig( + context, + type, + parquetWriteBuilder::withBloomFilterEnabled, + parquetWriteBuilder::withBloomFilterFPP); return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig); } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java index 62f330f9f572..bd2f4ead6dbd 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java @@ -109,7 +109,7 @@ public class TestBloomRowGroupFilter { optional(22, "timestamp", Types.TimestampType.withoutZone()), optional(23, "timestamptz", Types.TimestampType.withZone()), optional(24, "binary", Types.BinaryType.get()), - optional(25, "int_decimal", Types.DecimalType.of(8, 2)), + optional(25, "int-decimal", Types.DecimalType.of(8, 2)), optional(26, "long_decimal", Types.DecimalType.of(14, 2)), optional(27, "fixed_decimal", Types.DecimalType.of(31, 2))); @@ -140,7 +140,7 @@ public class TestBloomRowGroupFilter { optional(22, "_timestamp", Types.TimestampType.withoutZone()), optional(23, "_timestamptz", Types.TimestampType.withZone()), optional(24, "_binary", Types.BinaryType.get()), - optional(25, "_int_decimal", Types.DecimalType.of(8, 2)), + optional(25, "_int-decimal", Types.DecimalType.of(8, 2)), optional(26, "_long_decimal", Types.DecimalType.of(14, 2)), optional(27, "_fixed_decimal", Types.DecimalType.of(31, 2))); @@ -193,6 +193,7 @@ public void createInputFile() throws IOException { // build struct field schema org.apache.avro.Schema structSchema = AvroSchemaUtil.convert(UNDERSCORE_STRUCT_FIELD_TYPE); + String compatibleFieldName = AvroSchemaUtil.makeCompatibleName("_int-decimal"); OutputFile outFile = Files.localOutput(temp); try (FileAppender appender = @@ -221,7 +222,7 @@ public void createInputFile() throws IOException { .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_timestamp", "true") .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_timestamptz", "true") .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_binary", "true") - .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_int_decimal", "true") + .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_int-decimal", "true") .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_long_decimal", "true") .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_fixed_decimal", "true") .build()) { @@ -256,7 +257,7 @@ public void createInputFile() throws IOException { builder.set("_timestamp", INSTANT.plusSeconds(i * 86400).toEpochMilli()); builder.set("_timestamptz", INSTANT.plusSeconds(i * 86400).toEpochMilli()); builder.set("_binary", RANDOM_BYTES.get(i)); - builder.set("_int_decimal", new BigDecimal(String.valueOf(77.77 + i))); + builder.set(compatibleFieldName, new BigDecimal(String.valueOf(77.77 + i))); builder.set("_long_decimal", new BigDecimal(String.valueOf(88.88 + i))); builder.set("_fixed_decimal", new BigDecimal(String.valueOf(99.99 + i))); @@ -683,23 +684,23 @@ public void testBytesEq() { } @Test - public void testIntDeciamlEq() { + public void testIntDecimalEq() { for (int i = 0; i < INT_VALUE_COUNT; i++) { boolean shouldRead = new ParquetBloomRowGroupFilter( - SCHEMA, equal("int_decimal", new BigDecimal(String.valueOf(77.77 + i)))) + SCHEMA, equal("int-decimal", new BigDecimal(String.valueOf(77.77 + i)))) .shouldRead(parquetSchema, rowGroupMetadata, bloomStore); assertThat(shouldRead).as("Should read: decimal within range").isTrue(); } boolean shouldRead = - new ParquetBloomRowGroupFilter(SCHEMA, equal("int_decimal", new BigDecimal("1234.56"))) + new ParquetBloomRowGroupFilter(SCHEMA, equal("int-decimal", new BigDecimal("1234.56"))) .shouldRead(parquetSchema, rowGroupMetadata, bloomStore); assertThat(shouldRead).as("Should not read: decimal outside range").isFalse(); } @Test - public void testLongDeciamlEq() { + public void testLongDecimalEq() { for (int i = 0; i < INT_VALUE_COUNT; i++) { boolean shouldRead = new ParquetBloomRowGroupFilter( @@ -715,7 +716,7 @@ SCHEMA, equal("long_decimal", new BigDecimal(String.valueOf(88.88 + i)))) } @Test - public void testFixedDeciamlEq() { + public void testFixedDecimalEq() { for (int i = 0; i < INT_VALUE_COUNT; i++) { boolean shouldRead = new ParquetBloomRowGroupFilter(