From 0f509d2d678db2d7322dafded58ec0ca6d7fb268 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 22 Jan 2024 10:00:21 -0800 Subject: [PATCH] Parquet: Add system config for unsafe Parquet ID fallback. (#9324) Co-authored-by: Fokko Driesprong --- .../org/apache/iceberg/SystemConfigs.java | 17 +++++++++++++-- .../apache/iceberg/mapping/NameMapping.java | 5 +++++ .../org/apache/iceberg/parquet/Parquet.java | 21 +++++++++++-------- 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/SystemConfigs.java b/core/src/main/java/org/apache/iceberg/SystemConfigs.java index feac1f61a13e..9cb345b44480 100644 --- a/core/src/main/java/org/apache/iceberg/SystemConfigs.java +++ b/core/src/main/java/org/apache/iceberg/SystemConfigs.java @@ -72,6 +72,19 @@ private SystemConfigs() {} 8, Integer::parseUnsignedInt); + /** @deprecated will be removed in 2.0.0; use name mapping instead */ + @Deprecated + public static final ConfigEntry NETFLIX_UNSAFE_PARQUET_ID_FALLBACK_ENABLED = + new ConfigEntry<>( + "iceberg.netflix.unsafe-parquet-id-fallback.enabled", + "ICEBERG_NETFLIX_UNSAFE_PARQUET_ID_FALLBACK_ENABLED", + true, + s -> { + LOG.warn( + "Fallback ID assignment in Parquet is UNSAFE and will be removed in 2.0.0. Use name mapping instead."); + return Boolean.parseBoolean(s); + }); + public static class ConfigEntry { private final String propertyKey; private final String envKey; @@ -101,13 +114,13 @@ public final T defaultValue() { public final T value() { if (lazyValue == null) { - lazyValue = getValue(); + lazyValue = produceValue(); } return lazyValue; } - private T getValue() { + private T produceValue() { String value = System.getProperty(propertyKey); if (value == null) { value = System.getenv(envKey); diff --git a/core/src/main/java/org/apache/iceberg/mapping/NameMapping.java b/core/src/main/java/org/apache/iceberg/mapping/NameMapping.java index 642a77a4f2ea..5ca2f75793e4 100644 --- a/core/src/main/java/org/apache/iceberg/mapping/NameMapping.java +++ b/core/src/main/java/org/apache/iceberg/mapping/NameMapping.java @@ -28,6 +28,11 @@ /** Represents a mapping from external schema names to Iceberg type IDs. */ public class NameMapping implements Serializable { private static final Joiner DOT = Joiner.on('.'); + private static final NameMapping EMPTY = NameMapping.of(); + + public static NameMapping empty() { + return EMPTY; + } public static NameMapping of(MappedField... fields) { return new NameMapping(MappedFields.of(ImmutableList.copyOf(fields))); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index a19556c369ae..d591041d19c3 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -66,6 +66,7 @@ import org.apache.iceberg.SchemaParser; import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; +import org.apache.iceberg.SystemConfigs; import org.apache.iceberg.Table; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.data.parquet.GenericParquetWriter; @@ -1166,27 +1167,29 @@ public CloseableIterable build() { ParquetReadOptions options = optionsBuilder.build(); + NameMapping mapping; + if (nameMapping != null) { + mapping = nameMapping; + } else if (SystemConfigs.NETFLIX_UNSAFE_PARQUET_ID_FALLBACK_ENABLED.value()) { + mapping = null; + } else { + mapping = NameMapping.empty(); + } + if (batchedReaderFunc != null) { return new VectorizedParquetReader<>( file, schema, options, batchedReaderFunc, - nameMapping, + mapping, filter, reuseContainers, caseSensitive, maxRecordsPerBatch); } else { return new org.apache.iceberg.parquet.ParquetReader<>( - file, - schema, - options, - readerFunc, - nameMapping, - filter, - reuseContainers, - caseSensitive); + file, schema, options, readerFunc, mapping, filter, reuseContainers, caseSensitive); } }