diff --git a/docs/spark-writes.md b/docs/spark-writes.md index 7338af0c8820..b9435674609b 100644 --- a/docs/spark-writes.md +++ b/docs/spark-writes.md @@ -343,7 +343,8 @@ data.writeTo("prod.db.sample").option("mergeSchema","true").append() Iceberg's default Spark writers require that the data in each spark task is clustered by partition values. This distribution is required to minimize the number of file handles that are held open while writing. By default, starting in Iceberg 1.2.0, Iceberg also requests that Spark pre-sort data to be written to fit this distribution. The -request to Spark is done through the table property `write.distribution-mode` with the value `hash`. +request to Spark is done through the table property `write.distribution-mode` with the value `hash`. Spark doesn't respect +distribution mode in CTAS/RTAS before 3.5.0. Let's go through writing the data against below sample table: @@ -380,7 +381,7 @@ write data before writing. Practically, this means that each row is hashed based on the row's partition value and then placed in a corresponding Spark task based upon that value. Further division and coalescing of tasks may take place because of [Spark's Adaptive Query planning](#controlling-file-sizes). -* `range` - This mode requests that Spark perform a range based exchanged to shuffle the data before writing. +* `range` - This mode requests that Spark perform a range based exchange to shuffle the data before writing. This is a two stage procedure which is more expensive than the `hash` mode. The first stage samples the data to be written based on the partition and sort columns. The second stage uses the range information to shuffle the input data into Spark tasks. Each task gets an exclusive range of the input data which clusters the data by partition and also globally sorts. diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java index 1c08b1fd5ae7..74906241fa95 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java @@ -29,6 +29,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.spark.SparkCatalogTestBase; import org.apache.iceberg.types.Types; +import org.apache.spark.SparkException; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -46,7 +48,7 @@ public TestCreateTableAsSelect( "CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) " + "USING iceberg PARTITIONED BY (truncate(id, 3))", sourceName); - sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", sourceName); + sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')", sourceName); } @After @@ -102,6 +104,18 @@ public void testPartitionedCTAS() { sql("SELECT * FROM %s ORDER BY id", tableName)); } + @Test + public void testCTASWriteDistributionModeNotRespected() { + Assertions.assertThatThrownBy( + () -> + sql( + "CREATE TABLE %s USING iceberg PARTITIONED BY (bucket(2, id)) AS SELECT * FROM %s", + tableName, sourceName)) + .isInstanceOf(SparkException.class) + .hasMessageContaining( + "Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec"); + } + @Test public void testRTAS() { sql( diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java index 81b193942394..4098a155be0d 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java @@ -70,7 +70,7 @@ public void createTableIfNotExists() { "CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) " + "USING iceberg PARTITIONED BY (truncate(id, 3))", sourceName); - sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", sourceName); + sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')", sourceName); } @AfterEach @@ -125,6 +125,31 @@ public void testPartitionedCTAS() { sql("SELECT * FROM %s ORDER BY id", tableName)); } + @TestTemplate + public void testCTASWriteDistributionModeRespected() { + sql( + "CREATE TABLE %s USING iceberg PARTITIONED BY (bucket(2, id)) AS SELECT * FROM %s", + tableName, sourceName); + + Schema expectedSchema = + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + + PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema).bucket("id", 2).build(); + + Table ctasTable = validationCatalog.loadTable(tableIdent); + + assertThat(ctasTable.schema().asStruct()) + .as("Should have expected nullable schema") + .isEqualTo(expectedSchema.asStruct()); + assertThat(ctasTable.spec()).as("Should be partitioned by id").isEqualTo(expectedSpec); + assertEquals( + "Should have rows matching the source table", + sql("SELECT * FROM %s ORDER BY id", sourceName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + @TestTemplate public void testRTAS() { sql(