Skip to content

Commit

Permalink
Spark 3.4: Support file and partition delete granularity (apache#9602)
Browse files Browse the repository at this point in the history
This change backports apache#9384 to Spark 3.4.
  • Loading branch information
aokolnychyi authored and devangjhabakh committed Apr 22, 2024
1 parent c44de00 commit a840dbc
Show file tree
Hide file tree
Showing 10 changed files with 339 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
Expand All @@ -28,14 +29,18 @@
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.TestSparkCatalog;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
Expand Down Expand Up @@ -80,6 +85,46 @@ public static void clearTestSparkCatalogCache() {
TestSparkCatalog.clearTables();
}

@Test
public void testDeleteFileGranularity() throws NoSuchTableException {
checkDeleteFileGranularity(DeleteGranularity.FILE);
}

@Test
public void testDeletePartitionGranularity() throws NoSuchTableException {
checkDeleteFileGranularity(DeleteGranularity.PARTITION);
}

private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity)
throws NoSuchTableException {
createAndInitPartitionedTable();

sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);

append(tableName, new Employee(1, "hr"), new Employee(2, "hr"));
append(tableName, new Employee(3, "hr"), new Employee(4, "hr"));
append(tableName, new Employee(1, "hardware"), new Employee(2, "hardware"));
append(tableName, new Employee(3, "hardware"), new Employee(4, "hardware"));

createBranchIfNeeded();

sql("DELETE FROM %s WHERE id = 1 OR id = 3", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).hasSize(5);

Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2";
validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, null);

assertEquals(
"Should have expected rows",
ImmutableList.of(row(2, "hardware"), row(2, "hr"), row(4, "hardware"), row(4, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget()));
}

@Test
public void testCommitUnknownException() {
createAndInitTable("id INT, dep STRING, category STRING");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,20 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Encoders;
import org.junit.Test;

public class TestMergeOnReadMerge extends TestMerge {

Expand Down Expand Up @@ -56,4 +65,52 @@ protected Map<String, String> extraTableProperties() {
TableProperties.MERGE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
}

@Test
public void testMergeDeleteFileGranularity() {
checkMergeDeleteGranularity(DeleteGranularity.FILE);
}

@Test
public void testMergeDeletePartitionGranularity() {
checkMergeDeleteGranularity(DeleteGranularity.PARTITION);
}

private void checkMergeDeleteGranularity(DeleteGranularity deleteGranularity) {
createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */);

sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);

append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }");

createBranchIfNeeded();

createOrReplaceView("source", ImmutableList.of(1, 3, 5), Encoders.INT());

sql(
"MERGE INTO %s AS t USING source AS s "
+ "ON t.id == s.value "
+ "WHEN MATCHED THEN "
+ " DELETE "
+ "WHEN NOT MATCHED THEN "
+ " INSERT (id, dep) VALUES (-1, 'other')",
commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).hasSize(5);

Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2";
validateMergeOnRead(currentSnapshot, "3", expectedDeleteFilesCount, "1");

assertEquals(
"Should have expected rows",
ImmutableList.of(row(-1, "other"), row(2, "hr"), row(2, "it"), row(4, "hr"), row(4, "it")),
sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.SnapshotUtil;
import org.junit.Test;

public class TestMergeOnReadUpdate extends TestUpdate {

Expand Down Expand Up @@ -56,4 +64,51 @@ protected Map<String, String> extraTableProperties() {
TableProperties.UPDATE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
}

@Test
public void testUpdateFileGranularity() {
checkUpdateFileGranularity(DeleteGranularity.FILE);
}

@Test
public void testUpdatePartitionGranularity() {
checkUpdateFileGranularity(DeleteGranularity.PARTITION);
}

private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) {
createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */);

sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);

append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }");

createBranchIfNeeded();

sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).hasSize(5);

Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2";
validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, "2");

assertEquals(
"Should have expected rows",
ImmutableList.of(
row(0, "hr"),
row(2, "hr"),
row(2, "hr"),
row(4, "hr"),
row(0, "it"),
row(2, "it"),
row(2, "it"),
row(4, "it")),
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopTables;
Expand Down Expand Up @@ -364,8 +365,20 @@ public void writePartitionedClusteredEqualityDeleteWriter(Blackhole blackhole)

@Benchmark
@Threads(1)
public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole)
public void writeUnpartitionedClusteredPositionDeleteWriterPartitionGranularity(
Blackhole blackhole) throws IOException {
writeUnpartitionedClusteredPositionDeleteWriter(blackhole, DeleteGranularity.PARTITION);
}

@Benchmark
@Threads(1)
public void writeUnpartitionedClusteredPositionDeleteWriterFileGranularity(Blackhole blackhole)
throws IOException {
writeUnpartitionedClusteredPositionDeleteWriter(blackhole, DeleteGranularity.FILE);
}

private void writeUnpartitionedClusteredPositionDeleteWriter(
Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException {
FileIO io = table().io();

OutputFileFactory fileFactory = newFileFactory();
Expand All @@ -374,7 +387,7 @@ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole)

ClusteredPositionDeleteWriter<InternalRow> writer =
new ClusteredPositionDeleteWriter<>(
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity);

PositionDelete<InternalRow> positionDelete = PositionDelete.create();
try (ClusteredPositionDeleteWriter<InternalRow> closeableWriter = writer) {
Expand All @@ -391,7 +404,20 @@ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole)

@Benchmark
@Threads(1)
public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) throws IOException {
public void writeUnpartitionedFanoutPositionDeleteWriterPartitionGranularity(Blackhole blackhole)
throws IOException {
writeUnpartitionedFanoutPositionDeleteWriterPartition(blackhole, DeleteGranularity.PARTITION);
}

@Benchmark
@Threads(1)
public void writeUnpartitionedFanoutPositionDeleteWriterFileGranularity(Blackhole blackhole)
throws IOException {
writeUnpartitionedFanoutPositionDeleteWriterPartition(blackhole, DeleteGranularity.FILE);
}

private void writeUnpartitionedFanoutPositionDeleteWriterPartition(
Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException {
FileIO io = table().io();

OutputFileFactory fileFactory = newFileFactory();
Expand All @@ -400,7 +426,7 @@ public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) th

FanoutPositionOnlyDeleteWriter<InternalRow> writer =
new FanoutPositionOnlyDeleteWriter<>(
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity);

PositionDelete<InternalRow> positionDelete = PositionDelete.create();
try (FanoutPositionOnlyDeleteWriter<InternalRow> closeableWriter = writer) {
Expand All @@ -417,8 +443,20 @@ public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) th

@Benchmark
@Threads(1)
public void writeUnpartitionedFanoutPositionDeleteWriterShuffled(Blackhole blackhole)
throws IOException {
public void writeUnpartitionedFanoutPositionDeleteWriterShuffledPartitionGranularity(
Blackhole blackhole) throws IOException {
writeUnpartitionedFanoutPositionDeleteWriterShuffled(blackhole, DeleteGranularity.PARTITION);
}

@Benchmark
@Threads(1)
public void writeUnpartitionedFanoutPositionDeleteWriterShuffledFileGranularity(
Blackhole blackhole) throws IOException {
writeUnpartitionedFanoutPositionDeleteWriterShuffled(blackhole, DeleteGranularity.FILE);
}

private void writeUnpartitionedFanoutPositionDeleteWriterShuffled(
Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException {

FileIO io = table().io();

Expand All @@ -428,7 +466,7 @@ public void writeUnpartitionedFanoutPositionDeleteWriterShuffled(Blackhole black

FanoutPositionOnlyDeleteWriter<InternalRow> writer =
new FanoutPositionOnlyDeleteWriter<>(
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity);

PositionDelete<InternalRow> positionDelete = PositionDelete.create();
try (FanoutPositionOnlyDeleteWriter<InternalRow> closeableWriter = writer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -644,4 +645,15 @@ private String deleteOrcCompressionStrategy() {
.tableProperty(DELETE_ORC_COMPRESSION_STRATEGY)
.parseOptional();
}

public DeleteGranularity deleteGranularity() {
String valueAsString =
confParser
.stringConf()
.option(SparkWriteOptions.DELETE_GRANULARITY)
.tableProperty(TableProperties.DELETE_GRANULARITY)
.defaultValue(TableProperties.DELETE_GRANULARITY_DEFAULT)
.parse();
return DeleteGranularity.fromString(valueAsString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,7 @@ private SparkWriteOptions() {}
public static final String COMPRESSION_CODEC = "compression-codec";
public static final String COMPRESSION_LEVEL = "compression-level";
public static final String COMPRESSION_STRATEGY = "compression-strategy";

// Overrides the delete granularity
public static final String DELETE_GRANULARITY = "delete-granularity";
}
Loading

0 comments on commit a840dbc

Please sign in to comment.