Skip to content

Commit

Permalink
Parquet: Add system config for unsafe Parquet ID fallback. (#9324)
Browse files Browse the repository at this point in the history
Co-authored-by: Fokko Driesprong <[email protected]>
  • Loading branch information
rdblue and Fokko authored Jan 22, 2024
1 parent 26efa7a commit 0f509d2
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 11 deletions.
17 changes: 15 additions & 2 deletions core/src/main/java/org/apache/iceberg/SystemConfigs.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@ private SystemConfigs() {}
8,
Integer::parseUnsignedInt);

/** @deprecated will be removed in 2.0.0; use name mapping instead */
@Deprecated
public static final ConfigEntry<Boolean> NETFLIX_UNSAFE_PARQUET_ID_FALLBACK_ENABLED =
new ConfigEntry<>(
"iceberg.netflix.unsafe-parquet-id-fallback.enabled",
"ICEBERG_NETFLIX_UNSAFE_PARQUET_ID_FALLBACK_ENABLED",
true,
s -> {
LOG.warn(
"Fallback ID assignment in Parquet is UNSAFE and will be removed in 2.0.0. Use name mapping instead.");
return Boolean.parseBoolean(s);
});

public static class ConfigEntry<T> {
private final String propertyKey;
private final String envKey;
Expand Down Expand Up @@ -101,13 +114,13 @@ public final T defaultValue() {

public final T value() {
if (lazyValue == null) {
lazyValue = getValue();
lazyValue = produceValue();
}

return lazyValue;
}

private T getValue() {
private T produceValue() {
String value = System.getProperty(propertyKey);
if (value == null) {
value = System.getenv(envKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
/** Represents a mapping from external schema names to Iceberg type IDs. */
public class NameMapping implements Serializable {
private static final Joiner DOT = Joiner.on('.');
private static final NameMapping EMPTY = NameMapping.of();

public static NameMapping empty() {
return EMPTY;
}

public static NameMapping of(MappedField... fields) {
return new NameMapping(MappedFields.of(ImmutableList.copyOf(fields)));
Expand Down
21 changes: 12 additions & 9 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.SystemConfigs;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
Expand Down Expand Up @@ -1166,27 +1167,29 @@ public <D> CloseableIterable<D> build() {

ParquetReadOptions options = optionsBuilder.build();

NameMapping mapping;
if (nameMapping != null) {
mapping = nameMapping;
} else if (SystemConfigs.NETFLIX_UNSAFE_PARQUET_ID_FALLBACK_ENABLED.value()) {
mapping = null;
} else {
mapping = NameMapping.empty();
}

if (batchedReaderFunc != null) {
return new VectorizedParquetReader<>(
file,
schema,
options,
batchedReaderFunc,
nameMapping,
mapping,
filter,
reuseContainers,
caseSensitive,
maxRecordsPerBatch);
} else {
return new org.apache.iceberg.parquet.ParquetReader<>(
file,
schema,
options,
readerFunc,
nameMapping,
filter,
reuseContainers,
caseSensitive);
file, schema, options, readerFunc, mapping, filter, reuseContainers, caseSensitive);
}
}

Expand Down

0 comments on commit 0f509d2

Please sign in to comment.