From 0b1b6241dd334d6e25ee83cd6c0bbde7b337da7f Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Wed, 27 Sep 2023 13:08:41 -0700 Subject: [PATCH] Spark 3.5: Increase default advisory partition size for writes (#8660) --- .../org/apache/iceberg/TableProperties.java | 3 + .../iceberg/spark/extensions/TestDelete.java | 30 +++- .../iceberg/spark/extensions/TestMerge.java | 36 ++-- .../iceberg/spark/extensions/TestUpdate.java | 30 +++- .../iceberg/spark/SparkCompressionUtil.java | 148 ++++++++++++++++ .../apache/iceberg/spark/SparkConfParser.java | 6 +- .../iceberg/spark/SparkSQLProperties.java | 3 + .../apache/iceberg/spark/SparkWriteConf.java | 144 +++++++++++----- .../iceberg/spark/SparkWriteOptions.java | 3 + .../iceberg/spark/SparkWriteRequirements.java | 13 +- .../apache/iceberg/spark/SparkWriteUtil.java | 24 ++- .../spark/source/SparkPositionDeltaWrite.java | 15 +- .../iceberg/spark/source/SparkWrite.java | 15 +- .../spark/TestSparkCompressionUtil.java | 162 ++++++++++++++++++ 14 files changed, 543 insertions(+), 89 deletions(-) create mode 100644 spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCompressionUtil.java create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkCompressionUtil.java diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index af90303a8693..f315a0dfc99b 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -304,6 +304,9 @@ private TableProperties() {} public static final String SPARK_WRITE_ACCEPT_ANY_SCHEMA = "write.spark.accept-any-schema"; public static final boolean SPARK_WRITE_ACCEPT_ANY_SCHEMA_DEFAULT = false; + public static final String SPARK_WRITE_ADVISORY_PARTITION_SIZE_BYTES = + "write.spark.advisory-partition-size-bytes"; + public static final String SNAPSHOT_ID_INHERITANCE_ENABLED = "compatibility.snapshot-id-inheritance.enabled"; public static final boolean SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT = false; diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index 741b9de31a06..a5f62d73e9db 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -177,12 +177,19 @@ public void testCoalesceDelete() throws Exception { // enable AQE and set the advisory partition size big enough to trigger combining // set the number of shuffle partitions to 200 to distribute the work across reducers + // set the advisory partition size for shuffles small enough to ensure writes override it withSQLConf( ImmutableMap.of( - SQLConf.SHUFFLE_PARTITIONS().key(), "200", - SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", - SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true", - SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"), + SQLConf.SHUFFLE_PARTITIONS().key(), + "200", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "true", + SQLConf.COALESCE_PARTITIONS_ENABLED().key(), + "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), + "100", + SparkSQLProperties.ADVISORY_PARTITION_SIZE, + String.valueOf(256 * 1024 * 1024)), () -> { SparkPlan plan = executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget()); @@ -238,12 +245,19 @@ public void testSkewDelete() throws Exception { // enable AQE and set the advisory partition size small enough to trigger a split // set the number of shuffle partitions to 2 to only have 2 reducers + // set the advisory partition size for shuffles big enough to ensure writes override it withSQLConf( ImmutableMap.of( - SQLConf.SHUFFLE_PARTITIONS().key(), "2", - SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", - SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true", - SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"), + SQLConf.SHUFFLE_PARTITIONS().key(), + "2", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "true", + SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), + "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), + "256MB", + SparkSQLProperties.ADVISORY_PARTITION_SIZE, + "100"), () -> { SparkPlan plan = executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget()); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java index 3de4856e1a96..b8439a399f18 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java @@ -282,13 +282,21 @@ public void testCoalesceMerge() { // enable AQE and set the advisory partition big enough to trigger combining // set the number of shuffle partitions to 200 to distribute the work across reducers // disable broadcast joins to make sure the join triggers a shuffle + // set the advisory partition size for shuffles small enough to ensure writes override it withSQLConf( ImmutableMap.of( - SQLConf.SHUFFLE_PARTITIONS().key(), "200", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), "-1", - SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", - SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true", - SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"), + SQLConf.SHUFFLE_PARTITIONS().key(), + "200", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), + "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "true", + SQLConf.COALESCE_PARTITIONS_ENABLED().key(), + "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), + "100", + SparkSQLProperties.ADVISORY_PARTITION_SIZE, + String.valueOf(256 * 1024 * 1024)), () -> { sql( "MERGE INTO %s t USING source " @@ -352,13 +360,21 @@ public void testSkewMerge() { // enable AQE and set the advisory partition size small enough to trigger a split // set the number of shuffle partitions to 2 to only have 2 reducers // set the min coalesce partition size small enough to avoid coalescing + // set the advisory partition size for shuffles big enough to ensure writes override it withSQLConf( ImmutableMap.of( - SQLConf.SHUFFLE_PARTITIONS().key(), "4", - SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE().key(), "100", - SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", - SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true", - SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"), + SQLConf.SHUFFLE_PARTITIONS().key(), + "4", + SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE().key(), + "100", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "true", + SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), + "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), + "256MB", + SparkSQLProperties.ADVISORY_PARTITION_SIZE, + "100"), () -> { SparkPlan plan = executeAndKeepPlan( diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index 8e43947fd2db..5633b30ee519 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -162,12 +162,19 @@ public void testCoalesceUpdate() { // enable AQE and set the advisory partition size big enough to trigger combining // set the number of shuffle partitions to 200 to distribute the work across reducers + // set the advisory partition size for shuffles small enough to ensure writes override it withSQLConf( ImmutableMap.of( - SQLConf.SHUFFLE_PARTITIONS().key(), "200", - SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", - SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true", - SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"), + SQLConf.SHUFFLE_PARTITIONS().key(), + "200", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "true", + SQLConf.COALESCE_PARTITIONS_ENABLED().key(), + "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), + "100", + SparkSQLProperties.ADVISORY_PARTITION_SIZE, + String.valueOf(256 * 1024 * 1024)), () -> { SparkPlan plan = executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", commitTarget()); @@ -226,12 +233,19 @@ public void testSkewUpdate() { // enable AQE and set the advisory partition size small enough to trigger a split // set the number of shuffle partitions to 2 to only have 2 reducers + // set the advisory partition size for shuffles big enough to ensure writes override it withSQLConf( ImmutableMap.of( - SQLConf.SHUFFLE_PARTITIONS().key(), "2", - SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", - SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true", - SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"), + SQLConf.SHUFFLE_PARTITIONS().key(), + "2", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "true", + SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), + "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), + "256MB", + SparkSQLProperties.ADVISORY_PARTITION_SIZE, + "100"), () -> { SparkPlan plan = executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", commitTarget()); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCompressionUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCompressionUtil.java new file mode 100644 index 000000000000..8f00b7f8301d --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCompressionUtil.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.util.Locale; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Pair; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; + +class SparkCompressionUtil { + + private static final String LZ4 = "lz4"; + private static final String ZSTD = "zstd"; + private static final String GZIP = "gzip"; + private static final String ZLIB = "zlib"; + private static final String SNAPPY = "snappy"; + private static final String NONE = "none"; + + // an internal Spark config that controls whether shuffle data is compressed + private static final String SHUFFLE_COMPRESSION_ENABLED = "spark.shuffle.compress"; + private static final boolean SHUFFLE_COMPRESSION_ENABLED_DEFAULT = true; + + // an internal Spark config that controls what compression codec is used + private static final String SPARK_COMPRESSION_CODEC = "spark.io.compression.codec"; + private static final String SPARK_COMPRESSION_CODEC_DEFAULT = "lz4"; + + private static final double DEFAULT_COLUMNAR_COMPRESSION = 2; + private static final Map, Double> COLUMNAR_COMPRESSIONS = + initColumnarCompressions(); + + private static final double DEFAULT_ROW_BASED_COMPRESSION = 1; + private static final Map, Double> ROW_BASED_COMPRESSIONS = + initRowBasedCompressions(); + + private SparkCompressionUtil() {} + + /** + * Estimates how much the data in shuffle map files will compress once it is written to disk using + * a particular file format and codec. + */ + public static double shuffleCompressionRatio( + SparkSession spark, FileFormat outputFileFormat, String outputCodec) { + if (outputFileFormat == FileFormat.ORC || outputFileFormat == FileFormat.PARQUET) { + return columnarCompression(shuffleCodec(spark), outputCodec); + } else if (outputFileFormat == FileFormat.AVRO) { + return rowBasedCompression(shuffleCodec(spark), outputCodec); + } else { + return 1.0; + } + } + + private static String shuffleCodec(SparkSession spark) { + SparkConf sparkConf = spark.sparkContext().conf(); + return shuffleCompressionEnabled(sparkConf) ? sparkCodec(sparkConf) : NONE; + } + + private static boolean shuffleCompressionEnabled(SparkConf sparkConf) { + return sparkConf.getBoolean(SHUFFLE_COMPRESSION_ENABLED, SHUFFLE_COMPRESSION_ENABLED_DEFAULT); + } + + private static String sparkCodec(SparkConf sparkConf) { + return sparkConf.get(SPARK_COMPRESSION_CODEC, SPARK_COMPRESSION_CODEC_DEFAULT); + } + + private static double columnarCompression(String shuffleCodec, String outputCodec) { + Pair key = Pair.of(normalize(shuffleCodec), normalize(outputCodec)); + return COLUMNAR_COMPRESSIONS.getOrDefault(key, DEFAULT_COLUMNAR_COMPRESSION); + } + + private static double rowBasedCompression(String shuffleCodec, String outputCodec) { + Pair key = Pair.of(normalize(shuffleCodec), normalize(outputCodec)); + return ROW_BASED_COMPRESSIONS.getOrDefault(key, DEFAULT_ROW_BASED_COMPRESSION); + } + + private static String normalize(String value) { + return value != null ? value.toLowerCase(Locale.ROOT) : null; + } + + private static Map, Double> initColumnarCompressions() { + Map, Double> compressions = Maps.newHashMap(); + + compressions.put(Pair.of(NONE, ZSTD), 4.0); + compressions.put(Pair.of(NONE, GZIP), 4.0); + compressions.put(Pair.of(NONE, ZLIB), 4.0); + compressions.put(Pair.of(NONE, SNAPPY), 3.0); + compressions.put(Pair.of(NONE, LZ4), 3.0); + + compressions.put(Pair.of(ZSTD, ZSTD), 2.0); + compressions.put(Pair.of(ZSTD, GZIP), 2.0); + compressions.put(Pair.of(ZSTD, ZLIB), 2.0); + compressions.put(Pair.of(ZSTD, SNAPPY), 1.5); + compressions.put(Pair.of(ZSTD, LZ4), 1.5); + + compressions.put(Pair.of(SNAPPY, ZSTD), 3.0); + compressions.put(Pair.of(SNAPPY, GZIP), 3.0); + compressions.put(Pair.of(SNAPPY, ZLIB), 3.0); + compressions.put(Pair.of(SNAPPY, SNAPPY), 2.0); + compressions.put(Pair.of(SNAPPY, LZ4), 2.); + + compressions.put(Pair.of(LZ4, ZSTD), 3.0); + compressions.put(Pair.of(LZ4, GZIP), 3.0); + compressions.put(Pair.of(LZ4, ZLIB), 3.0); + compressions.put(Pair.of(LZ4, SNAPPY), 2.0); + compressions.put(Pair.of(LZ4, LZ4), 2.0); + + return compressions; + } + + private static Map, Double> initRowBasedCompressions() { + Map, Double> compressions = Maps.newHashMap(); + + compressions.put(Pair.of(NONE, ZSTD), 2.0); + compressions.put(Pair.of(NONE, GZIP), 2.0); + compressions.put(Pair.of(NONE, ZLIB), 2.0); + + compressions.put(Pair.of(ZSTD, SNAPPY), 0.5); + compressions.put(Pair.of(ZSTD, LZ4), 0.5); + + compressions.put(Pair.of(SNAPPY, ZSTD), 1.5); + compressions.put(Pair.of(SNAPPY, GZIP), 1.5); + compressions.put(Pair.of(SNAPPY, ZLIB), 1.5); + + compressions.put(Pair.of(LZ4, ZSTD), 1.5); + compressions.put(Pair.of(LZ4, GZIP), 1.5); + compressions.put(Pair.of(LZ4, ZLIB), 1.5); + + return compressions; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java index eaa301524c87..d86246b1e616 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java @@ -106,7 +106,7 @@ public int parse() { } public Integer parseOptional() { - return parse(Integer::parseInt, null); + return parse(Integer::parseInt, defaultValue); } } @@ -129,7 +129,7 @@ public long parse() { } public Long parseOptional() { - return parse(Long::parseLong, null); + return parse(Long::parseLong, defaultValue); } } @@ -152,7 +152,7 @@ public String parse() { } public String parseOptional() { - return parse(Function.identity(), null); + return parse(Function.identity(), defaultValue); } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 49ab6e4c4ef8..11d9cc296cbe 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -64,4 +64,7 @@ private SparkSQLProperties() {} // Overrides the delete planning mode public static final String DELETE_PLANNING_MODE = "spark.sql.iceberg.delete-planning-mode"; + + // Overrides the advisory partition size + public static final String ADVISORY_PARTITION_SIZE = "spark.sql.iceberg.advisory-partition-size"; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 834e839874f7..27ed07fcace8 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -33,6 +33,7 @@ import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; +import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE; import java.util.Locale; import java.util.Map; @@ -75,6 +76,10 @@ public class SparkWriteConf { private static final Logger LOG = LoggerFactory.getLogger(SparkWriteConf.class); + private static final long DATA_FILE_SIZE = 128 * 1024 * 1024; // 128 MB + private static final long DELETE_FILE_SIZE = 32 * 1024 * 1024; // 32 MB + + private final SparkSession spark; private final Table table; private final String branch; private final RuntimeConfig sessionConf; @@ -87,6 +92,7 @@ public SparkWriteConf(SparkSession spark, Table table, Map write public SparkWriteConf( SparkSession spark, Table table, String branch, Map writeOptions) { + this.spark = spark; this.table = table; this.branch = branch; this.sessionConf = spark.conf(); @@ -163,6 +169,19 @@ public FileFormat dataFileFormat() { return FileFormat.fromString(valueAsString); } + private String dataCompressionCodec() { + switch (dataFileFormat()) { + case PARQUET: + return parquetCompressionCodec(); + case AVRO: + return avroCompressionCodec(); + case ORC: + return orcCompressionCodec(); + default: + return null; + } + } + public long targetDataFileSize() { return confParser .longConf() @@ -200,6 +219,19 @@ public FileFormat deleteFileFormat() { return valueAsString != null ? FileFormat.fromString(valueAsString) : dataFileFormat(); } + private String deleteCompressionCodec() { + switch (deleteFileFormat()) { + case PARQUET: + return deleteParquetCompressionCodec(); + case AVRO: + return deleteAvroCompressionCodec(); + case ORC: + return deleteOrcCompressionCodec(); + default: + return null; + } + } + public long targetDeleteFileSize() { return confParser .longConf() @@ -236,7 +268,8 @@ public SparkWriteRequirements writeRequirements() { return SparkWriteRequirements.EMPTY; } - return SparkWriteUtil.writeRequirements(table, distributionMode(), fanoutWriterEnabled()); + return SparkWriteUtil.writeRequirements( + table, distributionMode(), fanoutWriterEnabled(), dataAdvisoryPartitionSize()); } @VisibleForTesting @@ -284,7 +317,11 @@ public SparkWriteRequirements copyOnWriteRequirements(Command command) { } return SparkWriteUtil.copyOnWriteRequirements( - table, command, copyOnWriteDistributionMode(command), fanoutWriterEnabled()); + table, + command, + copyOnWriteDistributionMode(command), + fanoutWriterEnabled(), + dataAdvisoryPartitionSize()); } @VisibleForTesting @@ -308,7 +345,11 @@ public SparkWriteRequirements positionDeltaRequirements(Command command) { } return SparkWriteUtil.positionDeltaRequirements( - table, command, positionDeltaDistributionMode(command), fanoutWriterEnabled()); + table, + command, + positionDeltaDistributionMode(command), + fanoutWriterEnabled(), + command == DELETE ? deleteAdvisoryPartitionSize() : dataAdvisoryPartitionSize()); } @VisibleForTesting @@ -484,42 +525,24 @@ private Map deleteWriteProperties() { switch (deleteFormat) { case PARQUET: - setWritePropertyWithFallback( - writeProperties, - DELETE_PARQUET_COMPRESSION, - deleteParquetCompressionCodec(), - parquetCompressionCodec()); - setWritePropertyWithFallback( - writeProperties, - DELETE_PARQUET_COMPRESSION_LEVEL, - deleteParquetCompressionLevel(), - parquetCompressionLevel()); + writeProperties.put(DELETE_PARQUET_COMPRESSION, deleteParquetCompressionCodec()); + String deleteParquetCompressionLevel = deleteParquetCompressionLevel(); + if (deleteParquetCompressionLevel != null) { + writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, deleteParquetCompressionLevel); + } break; case AVRO: - setWritePropertyWithFallback( - writeProperties, - DELETE_AVRO_COMPRESSION, - deleteAvroCompressionCodec(), - avroCompressionCodec()); - setWritePropertyWithFallback( - writeProperties, - DELETE_AVRO_COMPRESSION_LEVEL, - deleteAvroCompressionLevel(), - avroCompressionLevel()); + writeProperties.put(DELETE_AVRO_COMPRESSION, deleteAvroCompressionCodec()); + String deleteAvroCompressionLevel = deleteAvroCompressionLevel(); + if (deleteAvroCompressionLevel != null) { + writeProperties.put(DELETE_AVRO_COMPRESSION_LEVEL, deleteAvroCompressionLevel); + } break; case ORC: - setWritePropertyWithFallback( - writeProperties, - DELETE_ORC_COMPRESSION, - deleteOrcCompressionCodec(), - orcCompressionCodec()); - setWritePropertyWithFallback( - writeProperties, - DELETE_ORC_COMPRESSION_STRATEGY, - deleteOrcCompressionStrategy(), - orcCompressionStrategy()); + writeProperties.put(DELETE_ORC_COMPRESSION, deleteOrcCompressionCodec()); + writeProperties.put(DELETE_ORC_COMPRESSION_STRATEGY, deleteOrcCompressionStrategy()); break; default: @@ -529,15 +552,6 @@ private Map deleteWriteProperties() { return writeProperties; } - private void setWritePropertyWithFallback( - Map writeProperties, String key, String value, String fallbackValue) { - if (value != null) { - writeProperties.put(key, value); - } else if (fallbackValue != null) { - writeProperties.put(key, fallbackValue); - } - } - private String parquetCompressionCodec() { return confParser .stringConf() @@ -554,7 +568,8 @@ private String deleteParquetCompressionCodec() { .option(SparkWriteOptions.COMPRESSION_CODEC) .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) .tableProperty(DELETE_PARQUET_COMPRESSION) - .parseOptional(); + .defaultValue(parquetCompressionCodec()) + .parse(); } private String parquetCompressionLevel() { @@ -573,6 +588,7 @@ private String deleteParquetCompressionLevel() { .option(SparkWriteOptions.COMPRESSION_LEVEL) .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) .tableProperty(DELETE_PARQUET_COMPRESSION_LEVEL) + .defaultValue(parquetCompressionLevel()) .parseOptional(); } @@ -592,7 +608,8 @@ private String deleteAvroCompressionCodec() { .option(SparkWriteOptions.COMPRESSION_CODEC) .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) .tableProperty(DELETE_AVRO_COMPRESSION) - .parseOptional(); + .defaultValue(avroCompressionCodec()) + .parse(); } private String avroCompressionLevel() { @@ -611,6 +628,7 @@ private String deleteAvroCompressionLevel() { .option(SparkWriteOptions.COMPRESSION_LEVEL) .sessionConf(SparkSQLProperties.COMPRESSION_LEVEL) .tableProperty(DELETE_AVRO_COMPRESSION_LEVEL) + .defaultValue(avroCompressionLevel()) .parseOptional(); } @@ -630,7 +648,8 @@ private String deleteOrcCompressionCodec() { .option(SparkWriteOptions.COMPRESSION_CODEC) .sessionConf(SparkSQLProperties.COMPRESSION_CODEC) .tableProperty(DELETE_ORC_COMPRESSION) - .parseOptional(); + .defaultValue(orcCompressionCodec()) + .parse(); } private String orcCompressionStrategy() { @@ -649,6 +668,39 @@ private String deleteOrcCompressionStrategy() { .option(SparkWriteOptions.COMPRESSION_STRATEGY) .sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY) .tableProperty(DELETE_ORC_COMPRESSION_STRATEGY) - .parseOptional(); + .defaultValue(orcCompressionStrategy()) + .parse(); + } + + private long dataAdvisoryPartitionSize() { + long defaultValue = + advisoryPartitionSize(DATA_FILE_SIZE, dataFileFormat(), dataCompressionCodec()); + return advisoryPartitionSize(defaultValue); + } + + private long deleteAdvisoryPartitionSize() { + long defaultValue = + advisoryPartitionSize(DELETE_FILE_SIZE, deleteFileFormat(), deleteCompressionCodec()); + return advisoryPartitionSize(defaultValue); + } + + private long advisoryPartitionSize(long defaultValue) { + return confParser + .longConf() + .option(SparkWriteOptions.ADVISORY_PARTITION_SIZE) + .sessionConf(SparkSQLProperties.ADVISORY_PARTITION_SIZE) + .tableProperty(TableProperties.SPARK_WRITE_ADVISORY_PARTITION_SIZE_BYTES) + .defaultValue(defaultValue) + .parse(); + } + + private long advisoryPartitionSize( + long expectedFileSize, FileFormat outputFileFormat, String outputCodec) { + double shuffleCompressionRatio = shuffleCompressionRatio(outputFileFormat, outputCodec); + return (long) (expectedFileSize * shuffleCompressionRatio); + } + + private double shuffleCompressionRatio(FileFormat outputFileFormat, String outputCodec) { + return SparkCompressionUtil.shuffleCompressionRatio(spark, outputFileFormat, outputCodec); } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index 4e5e42064374..48dfa44c9122 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -80,4 +80,7 @@ private SparkWriteOptions() {} public static final String COMPRESSION_CODEC = "compression-codec"; public static final String COMPRESSION_LEVEL = "compression-level"; public static final String COMPRESSION_STRATEGY = "compression-strategy"; + + // Overrides the advisory partition size + public static final String ADVISORY_PARTITION_SIZE = "advisory-partition-size"; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteRequirements.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteRequirements.java index bfdc2fe8f8ca..833e0e44e391 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteRequirements.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteRequirements.java @@ -20,20 +20,24 @@ import org.apache.spark.sql.connector.distributions.Distribution; import org.apache.spark.sql.connector.distributions.Distributions; +import org.apache.spark.sql.connector.distributions.UnspecifiedDistribution; import org.apache.spark.sql.connector.expressions.SortOrder; /** A set of requirements such as distribution and ordering reported to Spark during writes. */ public class SparkWriteRequirements { public static final SparkWriteRequirements EMPTY = - new SparkWriteRequirements(Distributions.unspecified(), new SortOrder[0]); + new SparkWriteRequirements(Distributions.unspecified(), new SortOrder[0], 0); private final Distribution distribution; private final SortOrder[] ordering; + private final long advisoryPartitionSize; - SparkWriteRequirements(Distribution distribution, SortOrder[] ordering) { + SparkWriteRequirements( + Distribution distribution, SortOrder[] ordering, long advisoryPartitionSize) { this.distribution = distribution; this.ordering = ordering; + this.advisoryPartitionSize = advisoryPartitionSize; } public Distribution distribution() { @@ -47,4 +51,9 @@ public SortOrder[] ordering() { public boolean hasOrdering() { return ordering.length != 0; } + + public long advisoryPartitionSize() { + // Spark prohibits requesting a particular advisory partition size without distribution + return distribution instanceof UnspecifiedDistribution ? 0 : advisoryPartitionSize; + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java index 8f9c39de99be..0d68a0d8cdd0 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java @@ -66,11 +66,11 @@ private SparkWriteUtil() {} /** Builds requirements for batch and micro-batch writes such as append or overwrite. */ public static SparkWriteRequirements writeRequirements( - Table table, DistributionMode mode, boolean fanoutEnabled) { + Table table, DistributionMode mode, boolean fanoutEnabled, long advisoryPartitionSize) { Distribution distribution = writeDistribution(table, mode); SortOrder[] ordering = writeOrdering(table, fanoutEnabled); - return new SparkWriteRequirements(distribution, ordering); + return new SparkWriteRequirements(distribution, ordering, advisoryPartitionSize); } private static Distribution writeDistribution(Table table, DistributionMode mode) { @@ -91,14 +91,18 @@ private static Distribution writeDistribution(Table table, DistributionMode mode /** Builds requirements for copy-on-write DELETE, UPDATE, MERGE operations. */ public static SparkWriteRequirements copyOnWriteRequirements( - Table table, Command command, DistributionMode mode, boolean fanoutEnabled) { + Table table, + Command command, + DistributionMode mode, + boolean fanoutEnabled, + long advisoryPartitionSize) { if (command == DELETE || command == UPDATE) { Distribution distribution = copyOnWriteDeleteUpdateDistribution(table, mode); SortOrder[] ordering = writeOrdering(table, fanoutEnabled); - return new SparkWriteRequirements(distribution, ordering); + return new SparkWriteRequirements(distribution, ordering, advisoryPartitionSize); } else { - return writeRequirements(table, mode, fanoutEnabled); + return writeRequirements(table, mode, fanoutEnabled, advisoryPartitionSize); } } @@ -130,16 +134,20 @@ private static Distribution copyOnWriteDeleteUpdateDistribution( /** Builds requirements for merge-on-read DELETE, UPDATE, MERGE operations. */ public static SparkWriteRequirements positionDeltaRequirements( - Table table, Command command, DistributionMode mode, boolean fanoutEnabled) { + Table table, + Command command, + DistributionMode mode, + boolean fanoutEnabled, + long advisoryPartitionSize) { if (command == UPDATE || command == MERGE) { Distribution distribution = positionDeltaUpdateMergeDistribution(table, mode); SortOrder[] ordering = positionDeltaUpdateMergeOrdering(table, fanoutEnabled); - return new SparkWriteRequirements(distribution, ordering); + return new SparkWriteRequirements(distribution, ordering, advisoryPartitionSize); } else { Distribution distribution = positionDeltaDeleteDistribution(table, mode); SortOrder[] ordering = fanoutEnabled ? EMPTY_ORDERING : POSITION_DELETE_ORDERING; - return new SparkWriteRequirements(distribution, ordering); + return new SparkWriteRequirements(distribution, ordering, advisoryPartitionSize); } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index a8145c2abaa0..6c0fc591ffbd 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -132,7 +132,9 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde @Override public Distribution requiredDistribution() { - return writeRequirements.distribution(); + Distribution distribution = writeRequirements.distribution(); + LOG.info("Requesting {} as write distribution for table {}", distribution, table.name()); + return distribution; } @Override @@ -142,7 +144,16 @@ public boolean distributionStrictlyRequired() { @Override public SortOrder[] requiredOrdering() { - return writeRequirements.ordering(); + SortOrder[] ordering = writeRequirements.ordering(); + LOG.info("Requesting {} as write ordering for table {}", ordering, table.name()); + return ordering; + } + + @Override + public long advisoryPartitionSizeInBytes() { + long size = writeRequirements.advisoryPartitionSize(); + LOG.info("Requesting {} bytes advisory partition size for table {}", size, table.name()); + return size; } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 802c789ce84f..97359d0a3ae9 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -134,7 +134,9 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { @Override public Distribution requiredDistribution() { - return writeRequirements.distribution(); + Distribution distribution = writeRequirements.distribution(); + LOG.info("Requesting {} as write distribution for table {}", distribution, table.name()); + return distribution; } @Override @@ -144,7 +146,16 @@ public boolean distributionStrictlyRequired() { @Override public SortOrder[] requiredOrdering() { - return writeRequirements.ordering(); + SortOrder[] ordering = writeRequirements.ordering(); + LOG.info("Requesting {} as write ordering for table {}", ordering, table.name()); + return ordering; + } + + @Override + public long advisoryPartitionSizeInBytes() { + long size = writeRequirements.advisoryPartitionSize(); + LOG.info("Requesting {} bytes advisory partition size for table {}", size, table.name()); + return size; } BatchWrite asBatchAppend() { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkCompressionUtil.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkCompressionUtil.java new file mode 100644 index 000000000000..a2b48edffae4 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkCompressionUtil.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import static org.apache.iceberg.FileFormat.AVRO; +import static org.apache.iceberg.FileFormat.METADATA; +import static org.apache.iceberg.FileFormat.ORC; +import static org.apache.iceberg.FileFormat.PARQUET; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.iceberg.FileFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.internal.config.package$; +import org.apache.spark.sql.SparkSession; +import org.junit.Before; +import org.junit.Test; + +public class TestSparkCompressionUtil { + + private SparkSession spark; + private SparkConf sparkConf; + + @Before + public void initSpark() { + this.spark = mock(SparkSession.class); + this.sparkConf = mock(SparkConf.class); + + SparkContext sparkContext = mock(SparkContext.class); + + when(spark.sparkContext()).thenReturn(sparkContext); + when(sparkContext.conf()).thenReturn(sparkConf); + } + + @Test + public void testParquetCompressionRatios() { + configureShuffle("lz4", true); + + double ratio1 = shuffleCompressionRatio(PARQUET, "zstd"); + assertThat(ratio1).isEqualTo(3.0); + + double ratio2 = shuffleCompressionRatio(PARQUET, "gzip"); + assertThat(ratio2).isEqualTo(3.0); + + double ratio3 = shuffleCompressionRatio(PARQUET, "snappy"); + assertThat(ratio3).isEqualTo(2.0); + } + + @Test + public void testOrcCompressionRatios() { + configureShuffle("lz4", true); + + double ratio1 = shuffleCompressionRatio(ORC, "zlib"); + assertThat(ratio1).isEqualTo(3.0); + + double ratio2 = shuffleCompressionRatio(ORC, "lz4"); + assertThat(ratio2).isEqualTo(2.0); + } + + @Test + public void testAvroCompressionRatios() { + configureShuffle("lz4", true); + + double ratio1 = shuffleCompressionRatio(AVRO, "gzip"); + assertThat(ratio1).isEqualTo(1.5); + + double ratio2 = shuffleCompressionRatio(AVRO, "zstd"); + assertThat(ratio2).isEqualTo(1.5); + } + + @Test + public void testCodecNameNormalization() { + configureShuffle("zStD", true); + double ratio = shuffleCompressionRatio(PARQUET, "ZstD"); + assertThat(ratio).isEqualTo(2.0); + } + + @Test + public void testUnknownCodecNames() { + configureShuffle("SOME_SPARK_CODEC", true); + + double ratio1 = shuffleCompressionRatio(PARQUET, "SOME_PARQUET_CODEC"); + assertThat(ratio1).isEqualTo(2.0); + + double ratio2 = shuffleCompressionRatio(ORC, "SOME_ORC_CODEC"); + assertThat(ratio2).isEqualTo(2.0); + + double ratio3 = shuffleCompressionRatio(AVRO, "SOME_AVRO_CODEC"); + assertThat(ratio3).isEqualTo(1.0); + } + + @Test + public void testOtherFileFormats() { + configureShuffle("lz4", true); + double ratio = shuffleCompressionRatio(METADATA, "zstd"); + assertThat(ratio).isEqualTo(1.0); + } + + @Test + public void testNullFileCodec() { + configureShuffle("lz4", true); + + double ratio1 = shuffleCompressionRatio(PARQUET, null); + assertThat(ratio1).isEqualTo(2.0); + + double ratio2 = shuffleCompressionRatio(ORC, null); + assertThat(ratio2).isEqualTo(2.0); + + double ratio3 = shuffleCompressionRatio(AVRO, null); + assertThat(ratio3).isEqualTo(1.0); + } + + @Test + public void testUncompressedShuffles() { + configureShuffle("zstd", false); + + double ratio1 = shuffleCompressionRatio(PARQUET, "zstd"); + assertThat(ratio1).isEqualTo(4.0); + + double ratio2 = shuffleCompressionRatio(ORC, "zlib"); + assertThat(ratio2).isEqualTo(4.0); + + double ratio3 = shuffleCompressionRatio(AVRO, "gzip"); + assertThat(ratio3).isEqualTo(2.0); + } + + @Test + public void testSparkDefaults() { + assertThat(package$.MODULE$.SHUFFLE_COMPRESS().defaultValueString()).isEqualTo("true"); + assertThat(package$.MODULE$.IO_COMPRESSION_CODEC().defaultValueString()).isEqualTo("lz4"); + } + + private void configureShuffle(String codec, boolean compress) { + when(sparkConf.getBoolean(eq("spark.shuffle.compress"), anyBoolean())).thenReturn(compress); + when(sparkConf.get(eq("spark.io.compression.codec"), anyString())).thenReturn(codec); + } + + private double shuffleCompressionRatio(FileFormat fileFormat, String codec) { + return SparkCompressionUtil.shuffleCompressionRatio(spark, fileFormat, codec); + } +}