Skip to content

Commit

Permalink
Spark 3.5: Propagate snapshot properties in compaction (apache#9449)
Browse files Browse the repository at this point in the history
  • Loading branch information
advancedxy authored and geruh committed Jan 25, 2024
1 parent 6cb1d40 commit fa7992f
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
*/
package org.apache.iceberg.actions;

import java.util.Map;
import java.util.Set;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
Expand All @@ -36,6 +38,7 @@ public class RewriteDataFilesCommitManager {
private final Table table;
private final long startingSnapshotId;
private final boolean useStartingSequenceNumber;
private final Map<String, String> snapshotProperties;

// constructor used for testing
public RewriteDataFilesCommitManager(Table table) {
Expand All @@ -48,9 +51,18 @@ public RewriteDataFilesCommitManager(Table table, long startingSnapshotId) {

public RewriteDataFilesCommitManager(
Table table, long startingSnapshotId, boolean useStartingSequenceNumber) {
this(table, startingSnapshotId, useStartingSequenceNumber, ImmutableMap.of());
}

public RewriteDataFilesCommitManager(
Table table,
long startingSnapshotId,
boolean useStartingSequenceNumber,
Map<String, String> snapshotProperties) {
this.table = table;
this.startingSnapshotId = startingSnapshotId;
this.useStartingSequenceNumber = useStartingSequenceNumber;
this.snapshotProperties = snapshotProperties;
}

/**
Expand All @@ -75,6 +87,8 @@ public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {
rewrite.rewriteFiles(rewrittenDataFiles, addedDataFiles);
}

snapshotProperties.forEach(rewrite::set);

rewrite.commit();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
*/
package org.apache.iceberg.actions;

import java.util.Map;
import java.util.Set;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,10 +41,16 @@ public class RewritePositionDeletesCommitManager {

private final Table table;
private final long startingSnapshotId;
private final Map<String, String> snapshotProperties;

public RewritePositionDeletesCommitManager(Table table) {
this(table, ImmutableMap.of());
}

public RewritePositionDeletesCommitManager(Table table, Map<String, String> snapshotProperties) {
this.table = table;
this.startingSnapshotId = table.currentSnapshot().snapshotId();
this.snapshotProperties = snapshotProperties;
}

/**
Expand All @@ -64,6 +72,8 @@ public void commit(Set<RewritePositionDeletesGroup> fileGroups) {
}
}

snapshotProperties.forEach(rewriteFiles::set);

rewriteFiles.commit();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.spark.actions;

import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.spark.sql.SparkSession;

Expand All @@ -39,4 +40,8 @@ protected void commit(org.apache.iceberg.SnapshotUpdate<?> update) {
summary.forEach(update::set);
update.commit();
}

protected Map<String, String> commitSummary() {
return ImmutableMap.copyOf(summary);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ private ExecutorService rewriteService() {

@VisibleForTesting
RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
return new RewriteDataFilesCommitManager(table, startingSnapshotId, useStartingSequenceNumber);
return new RewriteDataFilesCommitManager(
table, startingSnapshotId, useStartingSequenceNumber, commitSummary());
}

private Result doExecute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private ExecutorService rewriteService() {
}

private RewritePositionDeletesCommitManager commitManager() {
return new RewritePositionDeletesCommitManager(table);
return new RewritePositionDeletesCommitManager(table, commitSummary());
}

private Result doExecute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -1445,6 +1446,23 @@ public void testRewriteJobOrderFilesDesc() {
assertThat(actual).as("Number of files order should not be ascending").isNotEqualTo(expected);
}

@Test
public void testSnapshotProperty() {
Table table = createTable(4);
Result ignored = basicRewrite(table).snapshotProperty("key", "value").execute();
assertThat(table.currentSnapshot().summary())
.containsAllEntriesOf(ImmutableMap.of("key", "value"));
// make sure internal produced properties are not lost
String[] commitMetricsKeys =
new String[] {
SnapshotSummary.ADDED_FILES_PROP,
SnapshotSummary.DELETED_FILES_PROP,
SnapshotSummary.TOTAL_DATA_FILES_PROP,
SnapshotSummary.CHANGED_PARTITION_COUNT_PROP
};
assertThat(table.currentSnapshot().summary()).containsKeys(commitMetricsKeys);
}

private Stream<RewriteFileGroup> toGroupStream(Table table, RewriteDataFilesSparkAction rewrite) {
rewrite.validateAndInitOptions();
StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -613,6 +614,39 @@ public void testSchemaEvolution() throws Exception {
assertEquals("Rows must match", expectedRecords, actualRecords);
}

@TestTemplate
public void testSnapshotProperty() throws Exception {
Table table = createTableUnpartitioned(2, SCALE);
List<DataFile> dataFiles = TestHelpers.dataFiles(table);
writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles);
assertThat(dataFiles).hasSize(2);

List<DeleteFile> deleteFiles = deleteFiles(table);
assertThat(deleteFiles).hasSize(2);

Result ignored =
SparkActions.get(spark)
.rewritePositionDeletes(table)
.snapshotProperty("key", "value")
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.execute();
assertThat(table.currentSnapshot().summary())
.containsAllEntriesOf(ImmutableMap.of("key", "value"));

// make sure internal produced properties are not lost
String[] commitMetricsKeys =
new String[] {
SnapshotSummary.ADDED_DELETE_FILES_PROP,
SnapshotSummary.ADDED_POS_DELETES_PROP,
SnapshotSummary.CHANGED_PARTITION_COUNT_PROP,
SnapshotSummary.REMOVED_DELETE_FILES_PROP,
SnapshotSummary.REMOVED_POS_DELETES_PROP,
SnapshotSummary.TOTAL_DATA_FILES_PROP,
SnapshotSummary.TOTAL_DELETE_FILES_PROP,
};
assertThat(table.currentSnapshot().summary()).containsKeys(commitMetricsKeys);
}

private Table createTablePartitioned(int partitions, int files, int numRecords) {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build();
Table table =
Expand Down

0 comments on commit fa7992f

Please sign in to comment.