Skip to content

Commit

Permalink
Spark: Add distribution mode not respected for CTAS/RTAS before 3.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
manuzhang committed Jan 18, 2024
1 parent 31d18f5 commit 58b5709
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
5 changes: 3 additions & 2 deletions docs/spark-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ data.writeTo("prod.db.sample").option("mergeSchema","true").append()
Iceberg's default Spark writers require that the data in each spark task is clustered by partition values. This
distribution is required to minimize the number of file handles that are held open while writing. By default, starting
in Iceberg 1.2.0, Iceberg also requests that Spark pre-sort data to be written to fit this distribution. The
request to Spark is done through the table property `write.distribution-mode` with the value `hash`.
request to Spark is done through the table property `write.distribution-mode` with the value `hash`. Spark doesn't respect
distribution mode in CTAS/RTAS before 3.5.0.

Let's go through writing the data against below sample table:

Expand Down Expand Up @@ -380,7 +381,7 @@ write data before writing.
Practically, this means that each row is hashed based on the row's partition value and then placed
in a corresponding Spark task based upon that value. Further division and coalescing of tasks may take place because of
[Spark's Adaptive Query planning](#controlling-file-sizes).
* `range` - This mode requests that Spark perform a range based exchanged to shuffle the data before writing.
* `range` - This mode requests that Spark perform a range based exchange to shuffle the data before writing.
This is a two stage procedure which is more expensive than the `hash` mode. The first stage samples the data to
be written based on the partition and sort columns. The second stage uses the range information to shuffle the input data into Spark
tasks. Each task gets an exclusive range of the input data which clusters the data by partition and also globally sorts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.types.Types;
import org.apache.spark.SparkException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -46,7 +47,7 @@ public TestCreateTableAsSelect(
"CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) "
+ "USING iceberg PARTITIONED BY (truncate(id, 3))",
sourceName);
sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", sourceName);
sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')", sourceName);
}

@After
Expand Down Expand Up @@ -100,6 +101,15 @@ public void testPartitionedCTAS() {
"Should have rows matching the source table",
sql("SELECT * FROM %s ORDER BY id", sourceName),
sql("SELECT * FROM %s ORDER BY id", tableName));

// Write distribution mode is not respected before Spark 3.5.0
Assert.assertThrows(
"Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec",
SparkException.class,
() ->
sql(
"CREATE TABLE %s USING iceberg PARTITIONED BY (bucket(2, id)) AS SELECT * FROM %s",
"table2", sourceName));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void createTableIfNotExists() {
"CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) "
+ "USING iceberg PARTITIONED BY (truncate(id, 3))",
sourceName);
sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", sourceName);
sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')", sourceName);
}

@AfterEach
Expand Down Expand Up @@ -125,6 +125,31 @@ public void testPartitionedCTAS() {
sql("SELECT * FROM %s ORDER BY id", tableName));
}

@TestTemplate
public void testCTASWriteDistributionModeRespected() {
sql(
"CREATE TABLE %s USING iceberg PARTITIONED BY (bucket(2, id)) AS SELECT * FROM %s",
tableName, sourceName);

Schema expectedSchema =
new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()));

PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema).bucket("id", 2).build();

Table ctasTable = validationCatalog.loadTable(tableIdent);

assertThat(ctasTable.schema().asStruct())
.as("Should have expected nullable schema")
.isEqualTo(expectedSchema.asStruct());
assertThat(ctasTable.spec()).as("Should be partitioned by id").isEqualTo(expectedSpec);
assertEquals(
"Should have rows matching the source table",
sql("SELECT * FROM %s ORDER BY id", sourceName),
sql("SELECT * FROM %s ORDER BY id", tableName));
}

@TestTemplate
public void testRTAS() {
sql(
Expand Down

0 comments on commit 58b5709

Please sign in to comment.