Skip to content

Commit

Permalink
Spark 3.4: Fix write and SQL options to override delete file compress…
Browse files Browse the repository at this point in the history
…ion config (#8438)
  • Loading branch information
jerqi authored Sep 7, 2023
1 parent 38e1c33 commit 3ed03c6
Show file tree
Hide file tree
Showing 6 changed files with 447 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
import static org.apache.iceberg.DistributionMode.RANGE;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
Expand Down Expand Up @@ -425,7 +431,107 @@ public String branch() {
return branch;
}

public String parquetCompressionCodec() {
public Map<String, String> writeProperties() {
Map<String, String> writeProperties = Maps.newHashMap();
writeProperties.putAll(dataWriteProperties());
writeProperties.putAll(deleteWriteProperties());
return writeProperties;
}

private Map<String, String> dataWriteProperties() {
Map<String, String> writeProperties = Maps.newHashMap();
FileFormat dataFormat = dataFileFormat();

switch (dataFormat) {
case PARQUET:
writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec());
String parquetCompressionLevel = parquetCompressionLevel();
if (parquetCompressionLevel != null) {
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}
break;

case AVRO:
writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec());
String avroCompressionLevel = avroCompressionLevel();
if (avroCompressionLevel != null) {
writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel);
}
break;

case ORC:
writeProperties.put(ORC_COMPRESSION, orcCompressionCodec());
writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy());
break;

default:
// skip
}

return writeProperties;
}

private Map<String, String> deleteWriteProperties() {
Map<String, String> writeProperties = Maps.newHashMap();
FileFormat deleteFormat = deleteFileFormat();

switch (deleteFormat) {
case PARQUET:
setWritePropertyWithFallback(
writeProperties,
DELETE_PARQUET_COMPRESSION,
deleteParquetCompressionCodec(),
parquetCompressionCodec());
setWritePropertyWithFallback(
writeProperties,
DELETE_PARQUET_COMPRESSION_LEVEL,
deleteParquetCompressionLevel(),
parquetCompressionLevel());
break;

case AVRO:
setWritePropertyWithFallback(
writeProperties,
DELETE_AVRO_COMPRESSION,
deleteAvroCompressionCodec(),
avroCompressionCodec());
setWritePropertyWithFallback(
writeProperties,
DELETE_AVRO_COMPRESSION_LEVEL,
deleteAvroCompressionLevel(),
avroCompressionLevel());
break;

case ORC:
setWritePropertyWithFallback(
writeProperties,
DELETE_ORC_COMPRESSION,
deleteOrcCompressionCodec(),
orcCompressionCodec());
setWritePropertyWithFallback(
writeProperties,
DELETE_ORC_COMPRESSION_STRATEGY,
deleteOrcCompressionStrategy(),
orcCompressionStrategy());
break;

default:
// skip
}

return writeProperties;
}

private void setWritePropertyWithFallback(
Map<String, String> writeProperties, String key, String value, String fallbackValue) {
if (value != null) {
writeProperties.put(key, value);
} else if (fallbackValue != null) {
writeProperties.put(key, fallbackValue);
}
}

private String parquetCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
Expand All @@ -435,7 +541,16 @@ public String parquetCompressionCodec() {
.parse();
}

public String parquetCompressionLevel() {
private String deleteParquetCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(DELETE_PARQUET_COMPRESSION)
.parseOptional();
}

private String parquetCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
Expand All @@ -445,7 +560,16 @@ public String parquetCompressionLevel() {
.parseOptional();
}

public String avroCompressionCodec() {
private String deleteParquetCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
.tableProperty(DELETE_PARQUET_COMPRESSION_LEVEL)
.parseOptional();
}

private String avroCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
Expand All @@ -455,7 +579,16 @@ public String avroCompressionCodec() {
.parse();
}

public String avroCompressionLevel() {
private String deleteAvroCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(DELETE_AVRO_COMPRESSION)
.parseOptional();
}

private String avroCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
Expand All @@ -465,7 +598,16 @@ public String avroCompressionLevel() {
.parseOptional();
}

public String orcCompressionCodec() {
private String deleteAvroCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
.tableProperty(DELETE_AVRO_COMPRESSION_LEVEL)
.parseOptional();
}

private String orcCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
Expand All @@ -475,7 +617,16 @@ public String orcCompressionCodec() {
.parse();
}

public String orcCompressionStrategy() {
private String deleteOrcCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(DELETE_ORC_COMPRESSION)
.parseOptional();
}

private String orcCompressionStrategy() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_STRATEGY)
Expand All @@ -485,35 +636,12 @@ public String orcCompressionStrategy() {
.parse();
}

public Map<String, String> writeProperties(FileFormat format) {
Map<String, String> writeProperties = Maps.newHashMap();

switch (format) {
case PARQUET:
writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec());
String parquetCompressionLevel = parquetCompressionLevel();
if (parquetCompressionLevel != null) {
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}
break;

case AVRO:
writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec());
String avroCompressionLevel = avroCompressionLevel();
if (avroCompressionLevel != null) {
writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel());
}
break;

case ORC:
writeProperties.put(ORC_COMPRESSION, orcCompressionCodec());
writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy());
break;

default:
// skip
}

return writeProperties;
private String deleteOrcCompressionStrategy() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_STRATEGY)
.sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY)
.tableProperty(DELETE_ORC_COMPRESSION_STRATEGY)
.parseOptional();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public class SparkPositionDeletesRewrite implements Write {
this.fileSetId = writeConf.rewrittenFileSetId();
this.specId = specId;
this.partition = partition;
this.writeProperties = writeConf.writeProperties(format);
this.writeProperties = writeConf.writeProperties();
}

@Override
Expand Down Expand Up @@ -221,6 +221,7 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
.deleteFileFormat(format)
.positionDeleteRowSchema(positionDeleteRowSchema)
.positionDeleteSparkType(deleteSparkType)
.writeProperties(writeProperties)
.build();
SparkFileWriterFactory writerFactoryWithoutRow =
SparkFileWriterFactory.builderFor(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
this.writeRequirements = writeConf.positionDeltaRequirements(command);
this.context = new Context(dataSchema, writeConf, info, writeRequirements);
this.writeProperties = writeConf.writeProperties(context.dataFileFormat);
this.writeProperties = writeConf.writeProperties();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
this.writeRequirements = writeRequirements;
this.outputSpecId = writeConf.outputSpecId();
this.writeProperties = writeConf.writeProperties(format);
this.writeProperties = writeConf.writeProperties();
}

@Override
Expand Down
Loading

0 comments on commit 3ed03c6

Please sign in to comment.