From d7e68915d5874cac8ac5f062f35169f7615d0b06 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Mon, 16 Dec 2024 15:43:52 -0800 Subject: [PATCH 1/4] Use compatible column name to set Parquet bloom filter --- .../org/apache/iceberg/parquet/Parquet.java | 74 +++++++++++++------ .../parquet/TestBloomRowGroupFilter.java | 19 ++--- 2 files changed, 60 insertions(+), 33 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index a3a59318320b..ac15493fe11d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -55,6 +55,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -95,6 +96,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.PropertyUtil; @@ -115,8 +117,12 @@ import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Parquet { + private static final Logger LOG = LoggerFactory.getLogger(Parquet.class); + private Parquet() {} private static final Collection READ_PROPERTIES_TO_REMOVE = @@ -266,6 +272,43 @@ private WriteBuilder createContextFunc( return this; } + private void setBloomFilterConfig( + Context context, + MessageType parquetSchema, + BiConsumer withBloomFilterEnabled, + BiConsumer withBloomFilterFPP) { + + Map fieldIdToParquetPath = + parquetSchema.getColumns().stream() + .collect( + Collectors.toMap( + col -> col.getPrimitiveType().getId().intValue(), + col -> String.join(".", col.getPath()))); + + context + .columnBloomFilterEnabled() + .forEach( + (colPath, isEnabled) -> { + Types.NestedField fieldId = schema.caseInsensitiveFindField(colPath); + if (fieldId == null) { + LOG.warn("Skipping bloom filter config for missing field: {}", colPath); + return; + } + + String parquetColumnPath = fieldIdToParquetPath.get(fieldId.fieldId()); + if (parquetColumnPath == null) { + LOG.warn("Skipping bloom filter config for missing field: {}", fieldId); + return; + } + + withBloomFilterEnabled.accept(parquetColumnPath, Boolean.valueOf(isEnabled)); + String fpp = context.columnBloomFilterFpp().get(colPath); + if (fpp != null) { + withBloomFilterFPP.accept(parquetColumnPath, Double.parseDouble(fpp)); + } + }); + } + public FileAppender build() throws IOException { Preconditions.checkNotNull(schema, "Schema is required"); Preconditions.checkNotNull(name, "Table name is required and cannot be null"); @@ -285,8 +328,6 @@ public FileAppender build() throws IOException { int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount(); int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount(); int bloomFilterMaxBytes = context.bloomFilterMaxBytes(); - Map columnBloomFilterFpp = context.columnBloomFilterFpp(); - Map columnBloomFilterEnabled = context.columnBloomFilterEnabled(); boolean dictionaryEnabled = context.dictionaryEnabled(); if (compressionLevel != null) { @@ -343,17 +384,8 @@ public FileAppender build() throws IOException { .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount) .withMaxBloomFilterBytes(bloomFilterMaxBytes); - for (Map.Entry entry : columnBloomFilterEnabled.entrySet()) { - String colPath = entry.getKey(); - String bloomEnabled = entry.getValue(); - propsBuilder.withBloomFilterEnabled(colPath, Boolean.parseBoolean(bloomEnabled)); - } - - for (Map.Entry entry : columnBloomFilterFpp.entrySet()) { - String colPath = entry.getKey(); - String fpp = entry.getValue(); - propsBuilder.withBloomFilterFPP(colPath, Double.parseDouble(fpp)); - } + setBloomFilterConfig( + context, type, propsBuilder::withBloomFilterEnabled, propsBuilder::withBloomFilterFPP); ParquetProperties parquetProperties = propsBuilder.build(); @@ -386,17 +418,11 @@ public FileAppender build() throws IOException { .withDictionaryPageSize(dictionaryPageSize) .withEncryption(fileEncryptionProperties); - for (Map.Entry entry : columnBloomFilterEnabled.entrySet()) { - String colPath = entry.getKey(); - String bloomEnabled = entry.getValue(); - parquetWriteBuilder.withBloomFilterEnabled(colPath, Boolean.parseBoolean(bloomEnabled)); - } - - for (Map.Entry entry : columnBloomFilterFpp.entrySet()) { - String colPath = entry.getKey(); - String fpp = entry.getValue(); - parquetWriteBuilder.withBloomFilterFPP(colPath, Double.parseDouble(fpp)); - } + setBloomFilterConfig( + context, + type, + parquetWriteBuilder::withBloomFilterEnabled, + parquetWriteBuilder::withBloomFilterFPP); return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig); } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java index 62f330f9f572..bd2f4ead6dbd 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java @@ -109,7 +109,7 @@ public class TestBloomRowGroupFilter { optional(22, "timestamp", Types.TimestampType.withoutZone()), optional(23, "timestamptz", Types.TimestampType.withZone()), optional(24, "binary", Types.BinaryType.get()), - optional(25, "int_decimal", Types.DecimalType.of(8, 2)), + optional(25, "int-decimal", Types.DecimalType.of(8, 2)), optional(26, "long_decimal", Types.DecimalType.of(14, 2)), optional(27, "fixed_decimal", Types.DecimalType.of(31, 2))); @@ -140,7 +140,7 @@ public class TestBloomRowGroupFilter { optional(22, "_timestamp", Types.TimestampType.withoutZone()), optional(23, "_timestamptz", Types.TimestampType.withZone()), optional(24, "_binary", Types.BinaryType.get()), - optional(25, "_int_decimal", Types.DecimalType.of(8, 2)), + optional(25, "_int-decimal", Types.DecimalType.of(8, 2)), optional(26, "_long_decimal", Types.DecimalType.of(14, 2)), optional(27, "_fixed_decimal", Types.DecimalType.of(31, 2))); @@ -193,6 +193,7 @@ public void createInputFile() throws IOException { // build struct field schema org.apache.avro.Schema structSchema = AvroSchemaUtil.convert(UNDERSCORE_STRUCT_FIELD_TYPE); + String compatibleFieldName = AvroSchemaUtil.makeCompatibleName("_int-decimal"); OutputFile outFile = Files.localOutput(temp); try (FileAppender appender = @@ -221,7 +222,7 @@ public void createInputFile() throws IOException { .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_timestamp", "true") .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_timestamptz", "true") .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_binary", "true") - .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_int_decimal", "true") + .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_int-decimal", "true") .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_long_decimal", "true") .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_fixed_decimal", "true") .build()) { @@ -256,7 +257,7 @@ public void createInputFile() throws IOException { builder.set("_timestamp", INSTANT.plusSeconds(i * 86400).toEpochMilli()); builder.set("_timestamptz", INSTANT.plusSeconds(i * 86400).toEpochMilli()); builder.set("_binary", RANDOM_BYTES.get(i)); - builder.set("_int_decimal", new BigDecimal(String.valueOf(77.77 + i))); + builder.set(compatibleFieldName, new BigDecimal(String.valueOf(77.77 + i))); builder.set("_long_decimal", new BigDecimal(String.valueOf(88.88 + i))); builder.set("_fixed_decimal", new BigDecimal(String.valueOf(99.99 + i))); @@ -683,23 +684,23 @@ public void testBytesEq() { } @Test - public void testIntDeciamlEq() { + public void testIntDecimalEq() { for (int i = 0; i < INT_VALUE_COUNT; i++) { boolean shouldRead = new ParquetBloomRowGroupFilter( - SCHEMA, equal("int_decimal", new BigDecimal(String.valueOf(77.77 + i)))) + SCHEMA, equal("int-decimal", new BigDecimal(String.valueOf(77.77 + i)))) .shouldRead(parquetSchema, rowGroupMetadata, bloomStore); assertThat(shouldRead).as("Should read: decimal within range").isTrue(); } boolean shouldRead = - new ParquetBloomRowGroupFilter(SCHEMA, equal("int_decimal", new BigDecimal("1234.56"))) + new ParquetBloomRowGroupFilter(SCHEMA, equal("int-decimal", new BigDecimal("1234.56"))) .shouldRead(parquetSchema, rowGroupMetadata, bloomStore); assertThat(shouldRead).as("Should not read: decimal outside range").isFalse(); } @Test - public void testLongDeciamlEq() { + public void testLongDecimalEq() { for (int i = 0; i < INT_VALUE_COUNT; i++) { boolean shouldRead = new ParquetBloomRowGroupFilter( @@ -715,7 +716,7 @@ SCHEMA, equal("long_decimal", new BigDecimal(String.valueOf(88.88 + i)))) } @Test - public void testFixedDeciamlEq() { + public void testFixedDecimalEq() { for (int i = 0; i < INT_VALUE_COUNT; i++) { boolean shouldRead = new ParquetBloomRowGroupFilter( From 75b3b5a0b9e12ea4ec8e753dbd45511692f61e22 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Mon, 16 Dec 2024 17:43:25 -0800 Subject: [PATCH 2/4] change to schema.findField --- parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index ac15493fe11d..310435209bac 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -289,7 +289,7 @@ private void setBloomFilterConfig( .columnBloomFilterEnabled() .forEach( (colPath, isEnabled) -> { - Types.NestedField fieldId = schema.caseInsensitiveFindField(colPath); + Types.NestedField fieldId = schema.findField(colPath); if (fieldId == null) { LOG.warn("Skipping bloom filter config for missing field: {}", colPath); return; From 942d24f30172c5bf1686b404c345bea5edc86625 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Fri, 3 Jan 2025 00:04:09 -0800 Subject: [PATCH 3/4] address comments --- .../org/apache/iceberg/parquet/Parquet.java | 74 ++++++------------- .../parquet/TestBloomRowGroupFilter.java | 39 +++++++--- 2 files changed, 54 insertions(+), 59 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 310435209bac..b3fda2554abc 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -55,7 +55,6 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -96,7 +95,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.PropertyUtil; @@ -117,12 +115,8 @@ import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class Parquet { - private static final Logger LOG = LoggerFactory.getLogger(Parquet.class); - private Parquet() {} private static final Collection READ_PROPERTIES_TO_REMOVE = @@ -272,43 +266,6 @@ private WriteBuilder createContextFunc( return this; } - private void setBloomFilterConfig( - Context context, - MessageType parquetSchema, - BiConsumer withBloomFilterEnabled, - BiConsumer withBloomFilterFPP) { - - Map fieldIdToParquetPath = - parquetSchema.getColumns().stream() - .collect( - Collectors.toMap( - col -> col.getPrimitiveType().getId().intValue(), - col -> String.join(".", col.getPath()))); - - context - .columnBloomFilterEnabled() - .forEach( - (colPath, isEnabled) -> { - Types.NestedField fieldId = schema.findField(colPath); - if (fieldId == null) { - LOG.warn("Skipping bloom filter config for missing field: {}", colPath); - return; - } - - String parquetColumnPath = fieldIdToParquetPath.get(fieldId.fieldId()); - if (parquetColumnPath == null) { - LOG.warn("Skipping bloom filter config for missing field: {}", fieldId); - return; - } - - withBloomFilterEnabled.accept(parquetColumnPath, Boolean.valueOf(isEnabled)); - String fpp = context.columnBloomFilterFpp().get(colPath); - if (fpp != null) { - withBloomFilterFPP.accept(parquetColumnPath, Double.parseDouble(fpp)); - } - }); - } - public FileAppender build() throws IOException { Preconditions.checkNotNull(schema, "Schema is required"); Preconditions.checkNotNull(name, "Table name is required and cannot be null"); @@ -328,6 +285,8 @@ public FileAppender build() throws IOException { int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount(); int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount(); int bloomFilterMaxBytes = context.bloomFilterMaxBytes(); + Map columnBloomFilterFpp = context.columnBloomFilterFpp(); + Map columnBloomFilterEnabled = context.columnBloomFilterEnabled(); boolean dictionaryEnabled = context.dictionaryEnabled(); if (compressionLevel != null) { @@ -384,8 +343,17 @@ public FileAppender build() throws IOException { .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount) .withMaxBloomFilterBytes(bloomFilterMaxBytes); - setBloomFilterConfig( - context, type, propsBuilder::withBloomFilterEnabled, propsBuilder::withBloomFilterFPP); + for (Map.Entry entry : columnBloomFilterEnabled.entrySet()) { + String colPath = AvroSchemaUtil.makeCompatibleName(entry.getKey()); + String bloomEnabled = entry.getValue(); + propsBuilder.withBloomFilterEnabled(colPath, Boolean.parseBoolean(bloomEnabled)); + } + + for (Map.Entry entry : columnBloomFilterFpp.entrySet()) { + String colPath = AvroSchemaUtil.makeCompatibleName(entry.getKey()); + String fpp = entry.getValue(); + propsBuilder.withBloomFilterFPP(colPath, Double.parseDouble(fpp)); + } ParquetProperties parquetProperties = propsBuilder.build(); @@ -418,11 +386,17 @@ public FileAppender build() throws IOException { .withDictionaryPageSize(dictionaryPageSize) .withEncryption(fileEncryptionProperties); - setBloomFilterConfig( - context, - type, - parquetWriteBuilder::withBloomFilterEnabled, - parquetWriteBuilder::withBloomFilterFPP); + for (Map.Entry entry : columnBloomFilterEnabled.entrySet()) { + String colPath = AvroSchemaUtil.makeCompatibleName(entry.getKey()); + String bloomEnabled = entry.getValue(); + parquetWriteBuilder.withBloomFilterEnabled(colPath, Boolean.parseBoolean(bloomEnabled)); + } + + for (Map.Entry entry : columnBloomFilterFpp.entrySet()) { + String colPath = AvroSchemaUtil.makeCompatibleName(entry.getKey()); + String fpp = entry.getValue(); + parquetWriteBuilder.withBloomFilterFPP(colPath, Double.parseDouble(fpp)); + } return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig); } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java index bd2f4ead6dbd..2812d915176c 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java @@ -109,9 +109,10 @@ public class TestBloomRowGroupFilter { optional(22, "timestamp", Types.TimestampType.withoutZone()), optional(23, "timestamptz", Types.TimestampType.withZone()), optional(24, "binary", Types.BinaryType.get()), - optional(25, "int-decimal", Types.DecimalType.of(8, 2)), + optional(25, "int_decimal", Types.DecimalType.of(8, 2)), optional(26, "long_decimal", Types.DecimalType.of(14, 2)), - optional(27, "fixed_decimal", Types.DecimalType.of(31, 2))); + optional(27, "fixed_decimal", Types.DecimalType.of(31, 2)), + optional(28, "incompatible-name", Types.DecimalType.of(8, 2))); private static final Types.StructType UNDERSCORE_STRUCT_FIELD_TYPE = Types.StructType.of(Types.NestedField.required(16, "_int_field", IntegerType.get())); @@ -140,9 +141,10 @@ public class TestBloomRowGroupFilter { optional(22, "_timestamp", Types.TimestampType.withoutZone()), optional(23, "_timestamptz", Types.TimestampType.withZone()), optional(24, "_binary", Types.BinaryType.get()), - optional(25, "_int-decimal", Types.DecimalType.of(8, 2)), + optional(25, "_int_decimal", Types.DecimalType.of(8, 2)), optional(26, "_long_decimal", Types.DecimalType.of(14, 2)), - optional(27, "_fixed_decimal", Types.DecimalType.of(31, 2))); + optional(27, "_fixed_decimal", Types.DecimalType.of(31, 2)), + optional(28, "_incompatible-name", Types.DecimalType.of(8, 2))); private static final String TOO_LONG_FOR_STATS; @@ -193,7 +195,7 @@ public void createInputFile() throws IOException { // build struct field schema org.apache.avro.Schema structSchema = AvroSchemaUtil.convert(UNDERSCORE_STRUCT_FIELD_TYPE); - String compatibleFieldName = AvroSchemaUtil.makeCompatibleName("_int-decimal"); + String compatibleFieldName = AvroSchemaUtil.makeCompatibleName("_incompatible-name"); OutputFile outFile = Files.localOutput(temp); try (FileAppender appender = @@ -222,9 +224,10 @@ public void createInputFile() throws IOException { .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_timestamp", "true") .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_timestamptz", "true") .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_binary", "true") - .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_int-decimal", "true") + .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_int_decimal", "true") .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_long_decimal", "true") .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_fixed_decimal", "true") + .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_incompatible-name", "true") .build()) { GenericRecordBuilder builder = new GenericRecordBuilder(convert(FILE_SCHEMA, "table")); // create 50 records @@ -257,9 +260,10 @@ public void createInputFile() throws IOException { builder.set("_timestamp", INSTANT.plusSeconds(i * 86400).toEpochMilli()); builder.set("_timestamptz", INSTANT.plusSeconds(i * 86400).toEpochMilli()); builder.set("_binary", RANDOM_BYTES.get(i)); - builder.set(compatibleFieldName, new BigDecimal(String.valueOf(77.77 + i))); + builder.set("_int_decimal", new BigDecimal(String.valueOf(77.77 + i))); builder.set("_long_decimal", new BigDecimal(String.valueOf(88.88 + i))); builder.set("_fixed_decimal", new BigDecimal(String.valueOf(99.99 + i))); + builder.set(compatibleFieldName, new BigDecimal(String.valueOf(77.77 + i))); appender.add(builder.build()); } @@ -688,13 +692,13 @@ public void testIntDecimalEq() { for (int i = 0; i < INT_VALUE_COUNT; i++) { boolean shouldRead = new ParquetBloomRowGroupFilter( - SCHEMA, equal("int-decimal", new BigDecimal(String.valueOf(77.77 + i)))) + SCHEMA, equal("int_decimal", new BigDecimal(String.valueOf(77.77 + i)))) .shouldRead(parquetSchema, rowGroupMetadata, bloomStore); assertThat(shouldRead).as("Should read: decimal within range").isTrue(); } boolean shouldRead = - new ParquetBloomRowGroupFilter(SCHEMA, equal("int-decimal", new BigDecimal("1234.56"))) + new ParquetBloomRowGroupFilter(SCHEMA, equal("int_decimal", new BigDecimal("1234.56"))) .shouldRead(parquetSchema, rowGroupMetadata, bloomStore); assertThat(shouldRead).as("Should not read: decimal outside range").isFalse(); } @@ -1190,4 +1194,21 @@ public void testTransformFilter() { .as("Should read: filter contains non-reference evaluate as True") .isTrue(); } + + @Test + public void testIncompatibleColumnNameEq() { + for (int i = 0; i < INT_VALUE_COUNT; i++) { + boolean shouldRead = + new ParquetBloomRowGroupFilter( + SCHEMA, equal("incompatible-name", new BigDecimal(String.valueOf(77.77 + i)))) + .shouldRead(parquetSchema, rowGroupMetadata, bloomStore); + assertThat(shouldRead).as("Should read: decimal within range").isTrue(); + } + + boolean shouldRead = + new ParquetBloomRowGroupFilter( + SCHEMA, equal("incompatible-name", new BigDecimal("1234.56"))) + .shouldRead(parquetSchema, rowGroupMetadata, bloomStore); + assertThat(shouldRead).as("Should not read: decimal outside range").isFalse(); + } } From 3246521cedbc81e3e5e81957ebfc33e2b09cc164 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Thu, 9 Jan 2025 17:44:22 -0800 Subject: [PATCH 4/4] revert to previous approach --- .../org/apache/iceberg/parquet/Parquet.java | 74 +++++++++++++------ .../parquet/TestBloomRowGroupFilter.java | 2 +- 2 files changed, 51 insertions(+), 25 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index b3fda2554abc..310435209bac 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -55,6 +55,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -95,6 +96,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.PropertyUtil; @@ -115,8 +117,12 @@ import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Parquet { + private static final Logger LOG = LoggerFactory.getLogger(Parquet.class); + private Parquet() {} private static final Collection READ_PROPERTIES_TO_REMOVE = @@ -266,6 +272,43 @@ private WriteBuilder createContextFunc( return this; } + private void setBloomFilterConfig( + Context context, + MessageType parquetSchema, + BiConsumer withBloomFilterEnabled, + BiConsumer withBloomFilterFPP) { + + Map fieldIdToParquetPath = + parquetSchema.getColumns().stream() + .collect( + Collectors.toMap( + col -> col.getPrimitiveType().getId().intValue(), + col -> String.join(".", col.getPath()))); + + context + .columnBloomFilterEnabled() + .forEach( + (colPath, isEnabled) -> { + Types.NestedField fieldId = schema.findField(colPath); + if (fieldId == null) { + LOG.warn("Skipping bloom filter config for missing field: {}", colPath); + return; + } + + String parquetColumnPath = fieldIdToParquetPath.get(fieldId.fieldId()); + if (parquetColumnPath == null) { + LOG.warn("Skipping bloom filter config for missing field: {}", fieldId); + return; + } + + withBloomFilterEnabled.accept(parquetColumnPath, Boolean.valueOf(isEnabled)); + String fpp = context.columnBloomFilterFpp().get(colPath); + if (fpp != null) { + withBloomFilterFPP.accept(parquetColumnPath, Double.parseDouble(fpp)); + } + }); + } + public FileAppender build() throws IOException { Preconditions.checkNotNull(schema, "Schema is required"); Preconditions.checkNotNull(name, "Table name is required and cannot be null"); @@ -285,8 +328,6 @@ public FileAppender build() throws IOException { int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount(); int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount(); int bloomFilterMaxBytes = context.bloomFilterMaxBytes(); - Map columnBloomFilterFpp = context.columnBloomFilterFpp(); - Map columnBloomFilterEnabled = context.columnBloomFilterEnabled(); boolean dictionaryEnabled = context.dictionaryEnabled(); if (compressionLevel != null) { @@ -343,17 +384,8 @@ public FileAppender build() throws IOException { .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount) .withMaxBloomFilterBytes(bloomFilterMaxBytes); - for (Map.Entry entry : columnBloomFilterEnabled.entrySet()) { - String colPath = AvroSchemaUtil.makeCompatibleName(entry.getKey()); - String bloomEnabled = entry.getValue(); - propsBuilder.withBloomFilterEnabled(colPath, Boolean.parseBoolean(bloomEnabled)); - } - - for (Map.Entry entry : columnBloomFilterFpp.entrySet()) { - String colPath = AvroSchemaUtil.makeCompatibleName(entry.getKey()); - String fpp = entry.getValue(); - propsBuilder.withBloomFilterFPP(colPath, Double.parseDouble(fpp)); - } + setBloomFilterConfig( + context, type, propsBuilder::withBloomFilterEnabled, propsBuilder::withBloomFilterFPP); ParquetProperties parquetProperties = propsBuilder.build(); @@ -386,17 +418,11 @@ public FileAppender build() throws IOException { .withDictionaryPageSize(dictionaryPageSize) .withEncryption(fileEncryptionProperties); - for (Map.Entry entry : columnBloomFilterEnabled.entrySet()) { - String colPath = AvroSchemaUtil.makeCompatibleName(entry.getKey()); - String bloomEnabled = entry.getValue(); - parquetWriteBuilder.withBloomFilterEnabled(colPath, Boolean.parseBoolean(bloomEnabled)); - } - - for (Map.Entry entry : columnBloomFilterFpp.entrySet()) { - String colPath = AvroSchemaUtil.makeCompatibleName(entry.getKey()); - String fpp = entry.getValue(); - parquetWriteBuilder.withBloomFilterFPP(colPath, Double.parseDouble(fpp)); - } + setBloomFilterConfig( + context, + type, + parquetWriteBuilder::withBloomFilterEnabled, + parquetWriteBuilder::withBloomFilterFPP); return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig); } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java index 2812d915176c..bfa511c91279 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java @@ -195,7 +195,7 @@ public void createInputFile() throws IOException { // build struct field schema org.apache.avro.Schema structSchema = AvroSchemaUtil.convert(UNDERSCORE_STRUCT_FIELD_TYPE); - String compatibleFieldName = AvroSchemaUtil.makeCompatibleName("_incompatible-name"); + String compatibleFieldName = "_incompatible_x2Dname"; OutputFile outFile = Files.localOutput(temp); try (FileAppender appender =