Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Support file and partition delete granularity #9602

Merged
merged 1 commit into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
Expand All @@ -28,14 +29,18 @@
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.TestSparkCatalog;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
Expand Down Expand Up @@ -80,6 +85,46 @@ public static void clearTestSparkCatalogCache() {
TestSparkCatalog.clearTables();
}

@Test
public void testDeleteFileGranularity() throws NoSuchTableException {
checkDeleteFileGranularity(DeleteGranularity.FILE);
}

@Test
public void testDeletePartitionGranularity() throws NoSuchTableException {
checkDeleteFileGranularity(DeleteGranularity.PARTITION);
}

private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity)
throws NoSuchTableException {
createAndInitPartitionedTable();

sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);

append(tableName, new Employee(1, "hr"), new Employee(2, "hr"));
append(tableName, new Employee(3, "hr"), new Employee(4, "hr"));
append(tableName, new Employee(1, "hardware"), new Employee(2, "hardware"));
append(tableName, new Employee(3, "hardware"), new Employee(4, "hardware"));

createBranchIfNeeded();

sql("DELETE FROM %s WHERE id = 1 OR id = 3", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).hasSize(5);

Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2";
validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, null);

assertEquals(
"Should have expected rows",
ImmutableList.of(row(2, "hardware"), row(2, "hr"), row(4, "hardware"), row(4, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget()));
}

@Test
public void testCommitUnknownException() {
createAndInitTable("id INT, dep STRING, category STRING");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,20 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Encoders;
import org.junit.Test;

public class TestMergeOnReadMerge extends TestMerge {

Expand Down Expand Up @@ -56,4 +65,52 @@ protected Map<String, String> extraTableProperties() {
TableProperties.MERGE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
}

@Test
public void testMergeDeleteFileGranularity() {
checkMergeDeleteGranularity(DeleteGranularity.FILE);
}

@Test
public void testMergeDeletePartitionGranularity() {
checkMergeDeleteGranularity(DeleteGranularity.PARTITION);
}

private void checkMergeDeleteGranularity(DeleteGranularity deleteGranularity) {
createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */);

sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);

append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }");

createBranchIfNeeded();

createOrReplaceView("source", ImmutableList.of(1, 3, 5), Encoders.INT());

sql(
"MERGE INTO %s AS t USING source AS s "
+ "ON t.id == s.value "
+ "WHEN MATCHED THEN "
+ " DELETE "
+ "WHEN NOT MATCHED THEN "
+ " INSERT (id, dep) VALUES (-1, 'other')",
commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).hasSize(5);

Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2";
validateMergeOnRead(currentSnapshot, "3", expectedDeleteFilesCount, "1");

assertEquals(
"Should have expected rows",
ImmutableList.of(row(-1, "other"), row(2, "hr"), row(2, "it"), row(4, "hr"), row(4, "it")),
sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.SnapshotUtil;
import org.junit.Test;

public class TestMergeOnReadUpdate extends TestUpdate {

Expand Down Expand Up @@ -56,4 +64,51 @@ protected Map<String, String> extraTableProperties() {
TableProperties.UPDATE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
}

@Test
public void testUpdateFileGranularity() {
checkUpdateFileGranularity(DeleteGranularity.FILE);
}

@Test
public void testUpdatePartitionGranularity() {
checkUpdateFileGranularity(DeleteGranularity.PARTITION);
}

private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) {
createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */);

sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);

append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }");

createBranchIfNeeded();

sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).hasSize(5);

Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
String expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? "4" : "2";
validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, "2");

assertEquals(
"Should have expected rows",
ImmutableList.of(
row(0, "hr"),
row(2, "hr"),
row(2, "hr"),
row(4, "hr"),
row(0, "it"),
row(2, "it"),
row(2, "it"),
row(4, "it")),
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopTables;
Expand Down Expand Up @@ -364,8 +365,20 @@ public void writePartitionedClusteredEqualityDeleteWriter(Blackhole blackhole)

@Benchmark
@Threads(1)
public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole)
public void writeUnpartitionedClusteredPositionDeleteWriterPartitionGranularity(
Blackhole blackhole) throws IOException {
writeUnpartitionedClusteredPositionDeleteWriter(blackhole, DeleteGranularity.PARTITION);
}

@Benchmark
@Threads(1)
public void writeUnpartitionedClusteredPositionDeleteWriterFileGranularity(Blackhole blackhole)
throws IOException {
writeUnpartitionedClusteredPositionDeleteWriter(blackhole, DeleteGranularity.FILE);
}

private void writeUnpartitionedClusteredPositionDeleteWriter(
Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException {
FileIO io = table().io();

OutputFileFactory fileFactory = newFileFactory();
Expand All @@ -374,7 +387,7 @@ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole)

ClusteredPositionDeleteWriter<InternalRow> writer =
new ClusteredPositionDeleteWriter<>(
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity);

PositionDelete<InternalRow> positionDelete = PositionDelete.create();
try (ClusteredPositionDeleteWriter<InternalRow> closeableWriter = writer) {
Expand All @@ -391,7 +404,20 @@ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole)

@Benchmark
@Threads(1)
public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) throws IOException {
public void writeUnpartitionedFanoutPositionDeleteWriterPartitionGranularity(Blackhole blackhole)
throws IOException {
writeUnpartitionedFanoutPositionDeleteWriterPartition(blackhole, DeleteGranularity.PARTITION);
}

@Benchmark
@Threads(1)
public void writeUnpartitionedFanoutPositionDeleteWriterFileGranularity(Blackhole blackhole)
throws IOException {
writeUnpartitionedFanoutPositionDeleteWriterPartition(blackhole, DeleteGranularity.FILE);
}

private void writeUnpartitionedFanoutPositionDeleteWriterPartition(
Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException {
FileIO io = table().io();

OutputFileFactory fileFactory = newFileFactory();
Expand All @@ -400,7 +426,7 @@ public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) th

FanoutPositionOnlyDeleteWriter<InternalRow> writer =
new FanoutPositionOnlyDeleteWriter<>(
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity);

PositionDelete<InternalRow> positionDelete = PositionDelete.create();
try (FanoutPositionOnlyDeleteWriter<InternalRow> closeableWriter = writer) {
Expand All @@ -417,8 +443,20 @@ public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole blackhole) th

@Benchmark
@Threads(1)
public void writeUnpartitionedFanoutPositionDeleteWriterShuffled(Blackhole blackhole)
throws IOException {
public void writeUnpartitionedFanoutPositionDeleteWriterShuffledPartitionGranularity(
Blackhole blackhole) throws IOException {
writeUnpartitionedFanoutPositionDeleteWriterShuffled(blackhole, DeleteGranularity.PARTITION);
}

@Benchmark
@Threads(1)
public void writeUnpartitionedFanoutPositionDeleteWriterShuffledFileGranularity(
Blackhole blackhole) throws IOException {
writeUnpartitionedFanoutPositionDeleteWriterShuffled(blackhole, DeleteGranularity.FILE);
}

private void writeUnpartitionedFanoutPositionDeleteWriterShuffled(
Blackhole blackhole, DeleteGranularity deleteGranularity) throws IOException {

FileIO io = table().io();

Expand All @@ -428,7 +466,7 @@ public void writeUnpartitionedFanoutPositionDeleteWriterShuffled(Blackhole black

FanoutPositionOnlyDeleteWriter<InternalRow> writer =
new FanoutPositionOnlyDeleteWriter<>(
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, deleteGranularity);

PositionDelete<InternalRow> positionDelete = PositionDelete.create();
try (FanoutPositionOnlyDeleteWriter<InternalRow> closeableWriter = writer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -644,4 +645,15 @@ private String deleteOrcCompressionStrategy() {
.tableProperty(DELETE_ORC_COMPRESSION_STRATEGY)
.parseOptional();
}

public DeleteGranularity deleteGranularity() {
String valueAsString =
confParser
.stringConf()
.option(SparkWriteOptions.DELETE_GRANULARITY)
.tableProperty(TableProperties.DELETE_GRANULARITY)
.defaultValue(TableProperties.DELETE_GRANULARITY_DEFAULT)
.parse();
return DeleteGranularity.fromString(valueAsString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,7 @@ private SparkWriteOptions() {}
public static final String COMPRESSION_CODEC = "compression-codec";
public static final String COMPRESSION_LEVEL = "compression-level";
public static final String COMPRESSION_STRATEGY = "compression-strategy";

// Overrides the delete granularity
public static final String DELETE_GRANULARITY = "delete-granularity";
}
Loading