From cdb327094f44e42f2e9063ce26f1abca106dbd7a Mon Sep 17 00:00:00 2001 From: advancedxy Date: Fri, 19 Jan 2024 03:35:48 +0800 Subject: [PATCH] Spark 3.5: Propagate snapshot properties in compaction (#9449) --- .../RewriteDataFilesCommitManager.java | 14 ++++++++ .../RewritePositionDeletesCommitManager.java | 10 ++++++ .../BaseSnapshotUpdateSparkAction.java | 5 +++ .../actions/RewriteDataFilesSparkAction.java | 3 +- ...RewritePositionDeleteFilesSparkAction.java | 2 +- .../actions/TestRewriteDataFilesAction.java | 18 ++++++++++ .../TestRewritePositionDeleteFilesAction.java | 34 +++++++++++++++++++ 7 files changed, 84 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java index 265b5c5c2705..7f89db467d73 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java @@ -18,12 +18,14 @@ */ package org.apache.iceberg.actions; +import java.util.Map; import java.util.Set; import org.apache.iceberg.DataFile; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; @@ -36,6 +38,7 @@ public class RewriteDataFilesCommitManager { private final Table table; private final long startingSnapshotId; private final boolean useStartingSequenceNumber; + private final Map snapshotProperties; // constructor used for testing public RewriteDataFilesCommitManager(Table table) { @@ -48,9 +51,18 @@ public RewriteDataFilesCommitManager(Table table, long startingSnapshotId) { public RewriteDataFilesCommitManager( Table table, long startingSnapshotId, boolean useStartingSequenceNumber) { + this(table, startingSnapshotId, useStartingSequenceNumber, ImmutableMap.of()); + } + + public RewriteDataFilesCommitManager( + Table table, + long startingSnapshotId, + boolean useStartingSequenceNumber, + Map snapshotProperties) { this.table = table; this.startingSnapshotId = startingSnapshotId; this.useStartingSequenceNumber = useStartingSequenceNumber; + this.snapshotProperties = snapshotProperties; } /** @@ -75,6 +87,8 @@ public void commitFileGroups(Set fileGroups) { rewrite.rewriteFiles(rewrittenDataFiles, addedDataFiles); } + snapshotProperties.forEach(rewrite::set); + rewrite.commit(); } diff --git a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java index c55532692e6f..01b2f7528ee3 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.actions; +import java.util.Map; import java.util.Set; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.DeleteFile; @@ -25,6 +26,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,10 +41,16 @@ public class RewritePositionDeletesCommitManager { private final Table table; private final long startingSnapshotId; + private final Map snapshotProperties; public RewritePositionDeletesCommitManager(Table table) { + this(table, ImmutableMap.of()); + } + + public RewritePositionDeletesCommitManager(Table table, Map snapshotProperties) { this.table = table; this.startingSnapshotId = table.currentSnapshot().snapshotId(); + this.snapshotProperties = snapshotProperties; } /** @@ -64,6 +72,8 @@ public void commit(Set fileGroups) { } } + snapshotProperties.forEach(rewriteFiles::set); + rewriteFiles.commit(); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java index 77debe1e589d..b69b80a8d3a6 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.actions; import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.spark.sql.SparkSession; @@ -39,4 +40,8 @@ protected void commit(org.apache.iceberg.SnapshotUpdate update) { summary.forEach(update::set); update.commit(); } + + protected Map commitSummary() { + return ImmutableMap.copyOf(summary); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index 6b5628a1f4b5..a2a585db78d2 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -257,7 +257,8 @@ private ExecutorService rewriteService() { @VisibleForTesting RewriteDataFilesCommitManager commitManager(long startingSnapshotId) { - return new RewriteDataFilesCommitManager(table, startingSnapshotId, useStartingSequenceNumber); + return new RewriteDataFilesCommitManager( + table, startingSnapshotId, useStartingSequenceNumber, commitSummary()); } private Result doExecute( diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java index f3dfd2dcc364..539f6de92007 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java @@ -215,7 +215,7 @@ private ExecutorService rewriteService() { } private RewritePositionDeletesCommitManager commitManager() { - return new RewritePositionDeletesCommitManager(table); + return new RewritePositionDeletesCommitManager(table, commitSummary()); } private Result doExecute( diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index d0d22e46ffc4..82b32f2ce002 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -60,6 +60,7 @@ import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; @@ -1445,6 +1446,23 @@ public void testRewriteJobOrderFilesDesc() { assertThat(actual).as("Number of files order should not be ascending").isNotEqualTo(expected); } + @Test + public void testSnapshotProperty() { + Table table = createTable(4); + Result ignored = basicRewrite(table).snapshotProperty("key", "value").execute(); + assertThat(table.currentSnapshot().summary()) + .containsAllEntriesOf(ImmutableMap.of("key", "value")); + // make sure internal produced properties are not lost + String[] commitMetricsKeys = + new String[] { + SnapshotSummary.ADDED_FILES_PROP, + SnapshotSummary.DELETED_FILES_PROP, + SnapshotSummary.TOTAL_DATA_FILES_PROP, + SnapshotSummary.CHANGED_PARTITION_COUNT_PROP + }; + assertThat(table.currentSnapshot().summary()).containsKeys(commitMetricsKeys); + } + private Stream toGroupStream(Table table, RewriteDataFilesSparkAction rewrite) { rewrite.validateAndInitOptions(); StructLikeMap>> fileGroupsByPartition = diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java index 7c55ff82df1e..89c44dbfccf8 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java @@ -49,6 +49,7 @@ import org.apache.iceberg.RowDelta; import org.apache.iceberg.ScanTask; import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -613,6 +614,39 @@ public void testSchemaEvolution() throws Exception { assertEquals("Rows must match", expectedRecords, actualRecords); } + @TestTemplate + public void testSnapshotProperty() throws Exception { + Table table = createTableUnpartitioned(2, SCALE); + List dataFiles = TestHelpers.dataFiles(table); + writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles); + assertThat(dataFiles).hasSize(2); + + List deleteFiles = deleteFiles(table); + assertThat(deleteFiles).hasSize(2); + + Result ignored = + SparkActions.get(spark) + .rewritePositionDeletes(table) + .snapshotProperty("key", "value") + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + assertThat(table.currentSnapshot().summary()) + .containsAllEntriesOf(ImmutableMap.of("key", "value")); + + // make sure internal produced properties are not lost + String[] commitMetricsKeys = + new String[] { + SnapshotSummary.ADDED_DELETE_FILES_PROP, + SnapshotSummary.ADDED_POS_DELETES_PROP, + SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, + SnapshotSummary.REMOVED_DELETE_FILES_PROP, + SnapshotSummary.REMOVED_POS_DELETES_PROP, + SnapshotSummary.TOTAL_DATA_FILES_PROP, + SnapshotSummary.TOTAL_DELETE_FILES_PROP, + }; + assertThat(table.currentSnapshot().summary()).containsKeys(commitMetricsKeys); + } + private Table createTablePartitioned(int partitions, int files, int numRecords) { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); Table table =