Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Add distribution mode not respected for CTAS/RTAS before 3.5.0 #9439

Merged
merged 1 commit into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/spark-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ data.writeTo("prod.db.sample").option("mergeSchema","true").append()
Iceberg's default Spark writers require that the data in each spark task is clustered by partition values. This
distribution is required to minimize the number of file handles that are held open while writing. By default, starting
in Iceberg 1.2.0, Iceberg also requests that Spark pre-sort data to be written to fit this distribution. The
request to Spark is done through the table property `write.distribution-mode` with the value `hash`.
request to Spark is done through the table property `write.distribution-mode` with the value `hash`. Spark doesn't respect
distribution mode in CTAS/RTAS before 3.5.0.
manuzhang marked this conversation as resolved.
Show resolved Hide resolved

Let's go through writing the data against below sample table:

Expand Down Expand Up @@ -380,7 +381,7 @@ write data before writing.
Practically, this means that each row is hashed based on the row's partition value and then placed
in a corresponding Spark task based upon that value. Further division and coalescing of tasks may take place because of
[Spark's Adaptive Query planning](#controlling-file-sizes).
* `range` - This mode requests that Spark perform a range based exchanged to shuffle the data before writing.
* `range` - This mode requests that Spark perform a range based exchange to shuffle the data before writing.
This is a two stage procedure which is more expensive than the `hash` mode. The first stage samples the data to
be written based on the partition and sort columns. The second stage uses the range information to shuffle the input data into Spark
tasks. Each task gets an exclusive range of the input data which clusters the data by partition and also globally sorts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.types.Types;
import org.apache.spark.SparkException;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -46,7 +48,7 @@ public TestCreateTableAsSelect(
"CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) "
+ "USING iceberg PARTITIONED BY (truncate(id, 3))",
sourceName);
sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", sourceName);
sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')", sourceName);
}

@After
Expand Down Expand Up @@ -102,6 +104,18 @@ public void testPartitionedCTAS() {
sql("SELECT * FROM %s ORDER BY id", tableName));
}

@Test
public void testCTASWriteDistributionModeNotRespected() {
Assertions.assertThatThrownBy(
() ->
sql(
"CREATE TABLE %s USING iceberg PARTITIONED BY (bucket(2, id)) AS SELECT * FROM %s",
tableName, sourceName))
.isInstanceOf(SparkException.class)
.hasMessageContaining(
"Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec");
}

@Test
public void testRTAS() {
sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void createTableIfNotExists() {
"CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) "
+ "USING iceberg PARTITIONED BY (truncate(id, 3))",
sourceName);
sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", sourceName);
sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')", sourceName);
}

@AfterEach
Expand Down Expand Up @@ -125,6 +125,31 @@ public void testPartitionedCTAS() {
sql("SELECT * FROM %s ORDER BY id", tableName));
}

@TestTemplate
public void testCTASWriteDistributionModeRespected() {
sql(
"CREATE TABLE %s USING iceberg PARTITIONED BY (bucket(2, id)) AS SELECT * FROM %s",
tableName, sourceName);

Schema expectedSchema =
new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()));

PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema).bucket("id", 2).build();

Table ctasTable = validationCatalog.loadTable(tableIdent);

assertThat(ctasTable.schema().asStruct())
.as("Should have expected nullable schema")
.isEqualTo(expectedSchema.asStruct());
assertThat(ctasTable.spec()).as("Should be partitioned by id").isEqualTo(expectedSpec);
assertEquals(
"Should have rows matching the source table",
sql("SELECT * FROM %s ORDER BY id", sourceName),
sql("SELECT * FROM %s ORDER BY id", tableName));
}

@TestTemplate
public void testRTAS() {
sql(
Expand Down