Skip to content

Commit

Permalink
Spark 3.4: Support file and partition delete granularity
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Jan 31, 2024
1 parent f8866bf commit 738ff02
Show file tree
Hide file tree
Showing 10 changed files with 338 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
Expand All @@ -28,14 +29,18 @@
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.TestSparkCatalog;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
Expand Down Expand Up @@ -80,6 +85,46 @@ public static void clearTestSparkCatalogCache() {
TestSparkCatalog.clearTables();
}

@Test
public void testDeleteFileGranularity() throws NoSuchTableException {
checkDeleteFileGranularity(DeleteGranularity.FILE);
}

@Test
public void testDeletePartitionGranularity() throws NoSuchTableException {
checkDeleteFileGranularity(DeleteGranularity.PARTITION);
}

private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity)
throws NoSuchTableException {
createAndInitPartitionedTable();

sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);

append(tableName, new Employee(1, "hr"), new Employee(2, "hr"));
append(tableName, new Employee(3, "hr"), new Employee(4, "hr"));
append(tableName, new Employee(1, "hardware"), new Employee(2, "hardware"));
append(tableName, new Employee(3, "hardware"), new Employee(4, "hardware"));

createBranchIfNeeded();

sql("DELETE FROM %s WHERE id = 1 OR id = 3", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).hasSize(5);

Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2";
validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, null);

assertEquals(
"Should have expected rows",
ImmutableList.of(row(2, "hardware"), row(2, "hr"), row(4, "hardware"), row(4, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget()));
}

@Test
public void testCommitUnknownException() {
createAndInitTable("id INT, dep STRING, category STRING");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,20 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Encoders;
import org.junit.Test;

public class TestMergeOnReadMerge extends TestMerge {

Expand Down Expand Up @@ -56,4 +65,52 @@ protected Map<String, String> extraTableProperties() {
TableProperties.MERGE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
}

@Test
public void testMergeDeleteFileGranularity() {
checkMergeDeleteGranularity(DeleteGranularity.FILE);
}

@Test
public void testMergeDeletePartitionGranularity() {
checkMergeDeleteGranularity(DeleteGranularity.PARTITION);
}

private void checkMergeDeleteGranularity(DeleteGranularity deleteGranularity) {
createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */);

sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);

append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }");

createBranchIfNeeded();

createOrReplaceView("source", ImmutableList.of(1, 3, 5), Encoders.INT());

sql(
"MERGE INTO %s AS t USING source AS s "
+ "ON t.id == s.value "
+ "WHEN MATCHED THEN "
+ " DELETE "
+ "WHEN NOT MATCHED THEN "
+ " INSERT (id, dep) VALUES (-1, 'other')",
commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).hasSize(5);

Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2";
validateMergeOnRead(currentSnapshot, "3", expectedDeleteFilesCount, "1");

assertEquals(
"Should have expected rows",
ImmutableList.of(row(-1, "other"), row(2, "hr"), row(2, "it"), row(4, "hr"), row(4, "it")),
sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.SnapshotUtil;
import org.junit.Test;

public class TestMergeOnReadUpdate extends TestUpdate {

Expand Down Expand Up @@ -56,4 +64,51 @@ protected Map<String, String> extraTableProperties() {
TableProperties.UPDATE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
}

@Test
public void testUpdateFileGranularity() {
checkUpdateFileGranularity(DeleteGranularity.FILE);
}

@Test
public void testUpdatePartitionGranularity() {
checkUpdateFileGranularity(DeleteGranularity.PARTITION);
}

private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) {
createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */);

sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);

append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }");

createBranchIfNeeded();

sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).hasSize(5);

Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2";
validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, "2");

assertEquals(
"Should have expected rows",
ImmutableList.of(
row(0, "hr"),
row(2, "hr"),
row(2, "hr"),
row(4, "hr"),
row(0, "it"),
row(2, "it"),
row(2, "it"),
row(4, "it")),
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopTables;
Expand Down Expand Up @@ -364,8 +365,20 @@ public void writePartitionedClusteredEqualityDeleteWriter(Blackhole blackhole)

@Benchmark
@Threads(1)
public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole)
public void writeUnpartitionedClusteredPositionDeleteWriterPartitionGranularity(
Blackhole blackhole) throws IOException {
writeUnpartitionedClusteredPositionDeleteWriter(blackhole, DeleteGranularity.PARTITION);
}

@Benchmark
@Threads(1)
public void writeUnpartitionedClusteredPositionDeleteWriterFileGranularity(Blackhole blackhole)
throws IOException {
writeUnpartitionedClusteredPositionDeleteWriter(blackhole, DeleteGranularity.FILE);
}

private void writeUnpartitionedClusteredPositionDeleteWriter(
Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException {
FileIO io = table().io();

OutputFileFactory fileFactory = newFileFactory();
Expand All @@ -374,7 +387,7 @@ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole)

ClusteredPositionDeleteWriter<InternalRow> writer =
new ClusteredPositionDeleteWriter<>(
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity);

PositionDelete<InternalRow> positionDelete = PositionDelete.create();
try (ClusteredPositionDeleteWriter<InternalRow> closeableWriter = writer) {
Expand All @@ -391,7 +404,20 @@ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole)

@Benchmark
@Threads(1)
public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) throws IOException {
public void writeUnpartitionedFanoutPositionDeleteWriterPartitionGranularity(Blackhole blackhole)
throws IOException {
writeUnpartitionedFanoutPositionDeleteWriterPartition(blackhole, DeleteGranularity.PARTITION);
}

@Benchmark
@Threads(1)
public void writeUnpartitionedFanoutPositionDeleteWriterFileGranularity(Blackhole blackhole)
throws IOException {
writeUnpartitionedFanoutPositionDeleteWriterPartition(blackhole, DeleteGranularity.FILE);
}

private void writeUnpartitionedFanoutPositionDeleteWriterPartition(
Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException {
FileIO io = table().io();

OutputFileFactory fileFactory = newFileFactory();
Expand All @@ -400,7 +426,7 @@ public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) th

FanoutPositionOnlyDeleteWriter<InternalRow> writer =
new FanoutPositionOnlyDeleteWriter<>(
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity);

PositionDelete<InternalRow> positionDelete = PositionDelete.create();
try (FanoutPositionOnlyDeleteWriter<InternalRow> closeableWriter = writer) {
Expand All @@ -417,8 +443,20 @@ public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) th

@Benchmark
@Threads(1)
public void writeUnpartitionedFanoutPositionDeleteWriterShuffled(Blackhole blackhole)
throws IOException {
public void writeUnpartitionedFanoutPositionDeleteWriterShuffledPartitionGranularity(
Blackhole blackhole) throws IOException {
writeUnpartitionedFanoutPositionDeleteWriterShuffled(blackhole, DeleteGranularity.PARTITION);
}

@Benchmark
@Threads(1)
public void writeUnpartitionedFanoutPositionDeleteWriterShuffledFileGranularity(
Blackhole blackhole) throws IOException {
writeUnpartitionedFanoutPositionDeleteWriterShuffled(blackhole, DeleteGranularity.FILE);
}

private void writeUnpartitionedFanoutPositionDeleteWriterShuffled(
Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException {

FileIO io = table().io();

Expand All @@ -428,7 +466,7 @@ public void writeUnpartitionedFanoutPositionDeleteWriterShuffled(Blackhole black

FanoutPositionOnlyDeleteWriter<InternalRow> writer =
new FanoutPositionOnlyDeleteWriter<>(
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity);

PositionDelete<InternalRow> positionDelete = PositionDelete.create();
try (FanoutPositionOnlyDeleteWriter<InternalRow> closeableWriter = writer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -644,4 +645,15 @@ private String deleteOrcCompressionStrategy() {
.tableProperty(DELETE_ORC_COMPRESSION_STRATEGY)
.parseOptional();
}

public DeleteGranularity deleteGranularity() {
String valueAsString =
confParser
.stringConf()
.option(SparkWriteOptions.DELETE_GRANULARITY)
.tableProperty(TableProperties.DELETE_GRANULARITY)
.defaultValue(TableProperties.DELETE_GRANULARITY_DEFAULT)
.parse();
return DeleteGranularity.fromString(valueAsString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,7 @@ private SparkWriteOptions() {}
public static final String COMPRESSION_CODEC = "compression-codec";
public static final String COMPRESSION_LEVEL = "compression-level";
public static final String COMPRESSION_STRATEGY = "compression-strategy";

// Overrides the delete granularity
public static final String DELETE_GRANULARITY = "delete-granularity";
}
Loading

0 comments on commit 738ff02

Please sign in to comment.