Skip to content

Commit

Permalink
[internal] rdar://135700073 Support completeCheck in CheckSnapshotInt…
Browse files Browse the repository at this point in the history
…egrity (apache#1334)


Provide a new feature flag to check referential integrity on target version only. This help workaround failure where files referenced by iceberg metadata are missing at runtime lookup
  • Loading branch information
Steve (ASE) Zhang authored and GitHub Enterprise committed Sep 11, 2024
1 parent 90ef7e9 commit 638dfc0
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,11 @@ acceptedBreaks:
old: "class org.apache.iceberg.types.Types.NestedField"
new: "class org.apache.iceberg.types.Types.NestedField"
justification: "new Constructor added"
"1.5.0.0-apple":
org.apache.iceberg:iceberg-api:
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.actions.CheckSnapshotIntegrity org.apache.iceberg.actions.CheckSnapshotIntegrity::completeCheck(boolean)"
justification: "Support new feature flag in CheckSnapshotIntegrity"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ public interface CheckSnapshotIntegrity
*/
CheckSnapshotIntegrity targetVersion(String targetVersion);

/**
* Only check target version for referential integrity instead of incremental snapshot from table
* to target version. Only valid when used with targetVersion together
*
* @param completeCheck boolean to indicate whether to apply complete integrity check for
* snapshots in the target version. Default to false
* @return this for method chaining
*/
CheckSnapshotIntegrity completeCheck(boolean completeCheck);

/** The action result that contains a summary of the execution. */
interface Result {
/** Returns locations of missing metadata/data files. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class CheckSnapshotIntegritySparkAction
private ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
private String targetVersion;
private Table targetTable;
private boolean completeCheck = false;

private Consumer<String> validateFunc =
new Consumer<String>() {
Expand Down Expand Up @@ -101,12 +102,25 @@ public CheckSnapshotIntegritySparkAction targetVersion(String tVersion) {
return this;
}

@Override
public CheckSnapshotIntegrity completeCheck(boolean completeCheckFlag) {
this.completeCheck = completeCheckFlag;
return this;
}

@Override
public Result execute() {
validateInputs();
JobGroupInfo info = newJobGroupInfo("CHECK-SNAPSHOT-INTEGRITY", jobDesc());
return withJobGroupInfo(info, this::doExecute);
}

private void validateInputs() {
Preconditions.checkArgument(
!this.completeCheck || this.targetVersion != null,
"completeCheck can only be used when targetVersion is set.");
}

private String jobDesc() {
return String.format(
"Checking integrity of version '%s' of table %s.", targetVersion, table.name());
Expand All @@ -128,8 +142,12 @@ private Result doExecute() {

private List<String> filesToCheck() {
Dataset<Row> targetFileDF = fileDS(targetTable).select("path");
Dataset<Row> currentFileDF = fileDS(table).select("path");
return targetFileDF.except(currentFileDF).as(Encoders.STRING()).collectAsList();
if (!completeCheck) {
// check only incremental files
Dataset<Row> currentFileDF = fileDS(table).select("path");
targetFileDF = targetFileDF.except(currentFileDF);
}
return targetFileDF.as(Encoders.STRING()).collectAsList();
}

private Dataset<FileInfo> fileDS(Table tbl) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.actions.ActionsProvider;
import org.apache.iceberg.actions.CheckSnapshotIntegrity;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -99,6 +100,42 @@ public void testCheckSnapshotIntegrity() {
checkMissingFiles(2, result2);
}

@Test
public void testCheckSnapshotIntegrityForTargetVersionOnly() {
String manifestListFileLocation = table.currentSnapshot().manifestListLocation();

String versionThreeFile =
((HasTableOperations) table).operations().current().metadataFileLocation();
Table startTable = newStaticTable(versionThreeFile, table.io());
CheckSnapshotIntegrity.Result result =
actions().checkSnapshotIntegrity(startTable).targetVersion(versionThreeFile).execute();
checkMissingFiles(0, result);

// delete v3 manifest-list files
table.io().deleteFile(manifestListFileLocation);

// fail as v3 is now corrupted
AssertHelpers.assertThrows(
"",
NullPointerException.class,
() -> actions().checkSnapshotIntegrity(startTable).execute());
AssertHelpers.assertThrows(
"",
NotFoundException.class,
() ->
actions().checkSnapshotIntegrity(startTable).targetVersion(versionThreeFile).execute());

// pass if use v2 as targetVersion
String versionTwoFile = tableLocation + "metadata/v2.metadata.json";
CheckSnapshotIntegrity.Result resultV1 =
actions()
.checkSnapshotIntegrity(startTable)
.targetVersion(versionTwoFile)
.completeCheck(true)
.execute();
checkMissingFiles(0, resultV1);
}

@Test
public void testStartFromFirstSnapshot() {
List<String> validFiles = validFiles();
Expand Down Expand Up @@ -134,12 +171,15 @@ public void testInputs() {
"", IllegalArgumentException.class, () -> actions.targetVersion(null));
AssertHelpers.assertThrows(
"", IllegalArgumentException.class, () -> actions.targetVersion("invalid"));
AssertHelpers.assertThrows(
"", IllegalArgumentException.class, () -> actions.completeCheck(true).execute());

// either version file name or path are valid
String versionFilePath = currentMetadata(table).metadataFileLocation();
actions.targetVersion(versionFilePath);
String versionFilename = fileName(versionFilePath);
actions.targetVersion(versionFilename);
actions.completeCheck(true);
}

private void checkMissingFiles(int num, CheckSnapshotIntegrity.Result result) {
Expand Down

0 comments on commit 638dfc0

Please sign in to comment.