diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 54f4bd81cbd1..de2f35bdf18c 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1040,6 +1040,11 @@ acceptedBreaks: old: "class org.apache.iceberg.types.Types.NestedField" new: "class org.apache.iceberg.types.Types.NestedField" justification: "new Constructor added" + "1.5.0.0-apple": + org.apache.iceberg:iceberg-api: + - code: "java.method.addedToInterface" + new: "method org.apache.iceberg.actions.CheckSnapshotIntegrity org.apache.iceberg.actions.CheckSnapshotIntegrity::completeCheck(boolean)" + justification: "Support new feature flag in CheckSnapshotIntegrity" apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" diff --git a/api/src/main/java/org/apache/iceberg/actions/CheckSnapshotIntegrity.java b/api/src/main/java/org/apache/iceberg/actions/CheckSnapshotIntegrity.java index b3bea2233503..7ca4538ffe70 100644 --- a/api/src/main/java/org/apache/iceberg/actions/CheckSnapshotIntegrity.java +++ b/api/src/main/java/org/apache/iceberg/actions/CheckSnapshotIntegrity.java @@ -45,6 +45,16 @@ public interface CheckSnapshotIntegrity */ CheckSnapshotIntegrity targetVersion(String targetVersion); + /** + * Only check target version for referential integrity instead of incremental snapshot from table + * to target version. Only valid when used with targetVersion together + * + * @param completeCheck boolean to indicate whether to apply complete integrity check for + * snapshots in the target version. Default to false + * @return this for method chaining + */ + CheckSnapshotIntegrity completeCheck(boolean completeCheck); + /** The action result that contains a summary of the execution. */ interface Result { /** Returns locations of missing metadata/data files. */ diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/CheckSnapshotIntegritySparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/CheckSnapshotIntegritySparkAction.java index b82be2d6ad15..fd311293f610 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/CheckSnapshotIntegritySparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/CheckSnapshotIntegritySparkAction.java @@ -51,6 +51,7 @@ public class CheckSnapshotIntegritySparkAction private ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE; private String targetVersion; private Table targetTable; + private boolean completeCheck = false; private Consumer validateFunc = new Consumer() { @@ -101,12 +102,25 @@ public CheckSnapshotIntegritySparkAction targetVersion(String tVersion) { return this; } + @Override + public CheckSnapshotIntegrity completeCheck(boolean completeCheckFlag) { + this.completeCheck = completeCheckFlag; + return this; + } + @Override public Result execute() { + validateInputs(); JobGroupInfo info = newJobGroupInfo("CHECK-SNAPSHOT-INTEGRITY", jobDesc()); return withJobGroupInfo(info, this::doExecute); } + private void validateInputs() { + Preconditions.checkArgument( + !this.completeCheck || this.targetVersion != null, + "completeCheck can only be used when targetVersion is set."); + } + private String jobDesc() { return String.format( "Checking integrity of version '%s' of table %s.", targetVersion, table.name()); @@ -128,8 +142,12 @@ private Result doExecute() { private List filesToCheck() { Dataset targetFileDF = fileDS(targetTable).select("path"); - Dataset currentFileDF = fileDS(table).select("path"); - return targetFileDF.except(currentFileDF).as(Encoders.STRING()).collectAsList(); + if (!completeCheck) { + // check only incremental files + Dataset currentFileDF = fileDS(table).select("path"); + targetFileDF = targetFileDF.except(currentFileDF); + } + return targetFileDF.as(Encoders.STRING()).collectAsList(); } private Dataset fileDS(Table tbl) { diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestCheckSnapshotIntegrityAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestCheckSnapshotIntegrityAction.java index 34952574166a..34a5ae654518 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestCheckSnapshotIntegrityAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestCheckSnapshotIntegrityAction.java @@ -34,6 +34,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.actions.ActionsProvider; import org.apache.iceberg.actions.CheckSnapshotIntegrity; +import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -99,6 +100,42 @@ public void testCheckSnapshotIntegrity() { checkMissingFiles(2, result2); } + @Test + public void testCheckSnapshotIntegrityForTargetVersionOnly() { + String manifestListFileLocation = table.currentSnapshot().manifestListLocation(); + + String versionThreeFile = + ((HasTableOperations) table).operations().current().metadataFileLocation(); + Table startTable = newStaticTable(versionThreeFile, table.io()); + CheckSnapshotIntegrity.Result result = + actions().checkSnapshotIntegrity(startTable).targetVersion(versionThreeFile).execute(); + checkMissingFiles(0, result); + + // delete v3 manifest-list files + table.io().deleteFile(manifestListFileLocation); + + // fail as v3 is now corrupted + AssertHelpers.assertThrows( + "", + NullPointerException.class, + () -> actions().checkSnapshotIntegrity(startTable).execute()); + AssertHelpers.assertThrows( + "", + NotFoundException.class, + () -> + actions().checkSnapshotIntegrity(startTable).targetVersion(versionThreeFile).execute()); + + // pass if use v2 as targetVersion + String versionTwoFile = tableLocation + "metadata/v2.metadata.json"; + CheckSnapshotIntegrity.Result resultV1 = + actions() + .checkSnapshotIntegrity(startTable) + .targetVersion(versionTwoFile) + .completeCheck(true) + .execute(); + checkMissingFiles(0, resultV1); + } + @Test public void testStartFromFirstSnapshot() { List validFiles = validFiles(); @@ -134,12 +171,15 @@ public void testInputs() { "", IllegalArgumentException.class, () -> actions.targetVersion(null)); AssertHelpers.assertThrows( "", IllegalArgumentException.class, () -> actions.targetVersion("invalid")); + AssertHelpers.assertThrows( + "", IllegalArgumentException.class, () -> actions.completeCheck(true).execute()); // either version file name or path are valid String versionFilePath = currentMetadata(table).metadataFileLocation(); actions.targetVersion(versionFilePath); String versionFilename = fileName(versionFilePath); actions.targetVersion(versionFilename); + actions.completeCheck(true); } private void checkMissingFiles(int num, CheckSnapshotIntegrity.Result result) {