Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
szehon-ho committed Jan 5, 2025
1 parent 36b3365 commit 33d53ce
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 103 deletions.
18 changes: 9 additions & 9 deletions core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public static TableMetadata replacePaths(
metadata.snapshotLog(),
metadataLogEntries,
metadata.refs(),
// TODO: update statistic file paths
metadata.statisticsFiles(),
metadata.partitionStatisticsFiles(),
metadata.changes());
Expand Down Expand Up @@ -276,7 +277,7 @@ private static List<ManifestFile> manifestFilesInSnapshot(FileIO io, Snapshot sn
* @param targetPrefix target prefix that will replace it
* @return a copy plan of content files in the manifest that was rewritten
*/
public static List<Pair<String, String>> rewriteManifest(
public static RewriteResult<DataFile> rewriteDataManifest(
ManifestFile manifestFile,
OutputFile outputFile,
FileIO io,
Expand All @@ -292,7 +293,7 @@ public static List<Pair<String, String>> rewriteManifest(
ManifestFiles.read(manifestFile, io, specsById).select(Arrays.asList("*"))) {
return StreamSupport.stream(reader.entries().spliterator(), false)
.map(entry -> writeDataFileEntry(entry, spec, sourcePrefix, targetPrefix, writer))
.collect(Collectors.toList());
.reduce(new RewriteResult<>(), RewriteResult::append);
}
}

Expand Down Expand Up @@ -335,12 +336,13 @@ public static RewriteResult<DeleteFile> rewriteDeleteManifest(
}
}

private static Pair<String, String> writeDataFileEntry(
private static RewriteResult<DataFile> writeDataFileEntry(
ManifestEntry<DataFile> entry,
PartitionSpec spec,
String sourcePrefix,
String targetPrefix,
ManifestWriter<DataFile> writer) {
RewriteResult<DataFile> result = new RewriteResult<>();
DataFile dataFile = entry.file();
String sourceDataFilePath = dataFile.location();
Preconditions.checkArgument(
Expand All @@ -352,7 +354,8 @@ private static Pair<String, String> writeDataFileEntry(
DataFile newDataFile =
DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build();
appendEntryWithFile(entry, writer, newDataFile);
return Pair.of(sourceDataFilePath, newDataFile.location());
result.copyPlan().add(Pair.of(sourceDataFilePath, newDataFile.location()));
return result;
}

private static RewriteResult<DeleteFile> writeDeleteFileEntry(
Expand Down Expand Up @@ -386,7 +389,7 @@ private static RewriteResult<DeleteFile> writeDeleteFileEntry(
case EQUALITY_DELETES:
DeleteFile eqDeleteFile = newEqualityDeleteEntry(file, spec, sourcePrefix, targetPrefix);
appendEntryWithFile(entry, writer, eqDeleteFile);
// we do not need to recursively rewrite the equality delete, just move it
// No need to rewrite equality delete files as they do not contain absolute file paths.
result.copyPlan().add(Pair.of(file.location(), eqDeleteFile.location()));
return result;

Expand Down Expand Up @@ -469,10 +472,7 @@ public static void rewritePositionDeleteFile(
String path = deleteFile.location();
if (!path.startsWith(sourcePrefix)) {
throw new UnsupportedOperationException(
"Expected delete file to be under the source prefix: "
+ sourcePrefix
+ " but was "
+ path);
String.format("Expected delete file %s to start with prefix: %s", path, sourcePrefix));
}
InputFile sourceFile = io.newInputFile(path);
try (CloseableIterable<Record> reader =
Expand Down
23 changes: 0 additions & 23 deletions core/src/main/java/org/apache/iceberg/TableMetadataUtil.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
Expand Down Expand Up @@ -207,7 +208,16 @@ private void validateAndSetStartVersion() {
}

private String validateVersion(TableMetadata tableMetadata, String versionFileName) {
String versionFile = versionFile(tableMetadata, versionFileName);
String versionFile = null;
if (versionInFilePath(tableMetadata.metadataFileLocation(), versionFileName)) {
versionFile = tableMetadata.metadataFileLocation();
}

for (MetadataLogEntry log : tableMetadata.previousFiles()) {
if (versionInFilePath(log.file(), versionFileName)) {
versionFile = log.file();
}
}

Preconditions.checkNotNull(
versionFile, "Version file %s does not exist in metadata log.", versionFile);
Expand All @@ -216,19 +226,6 @@ private String validateVersion(TableMetadata tableMetadata, String versionFileNa
return versionFile;
}

private String versionFile(TableMetadata metadata, String versionFileName) {
if (versionInFilePath(metadata.metadataFileLocation(), versionFileName)) {
return metadata.metadataFileLocation();
}

for (MetadataLogEntry log : metadata.previousFiles()) {
if (versionInFilePath(log.file(), versionFileName)) {
return log.file();
}
}
return null;
}

private boolean versionInFilePath(String path, String version) {
return RewriteTablePathUtil.fileName(path).equals(version);
}
Expand Down Expand Up @@ -273,10 +270,9 @@ private String rebuildMetadata() {

// rebuild version files
RewriteResult<Snapshot> rewriteVersionResult = rewriteVersionFiles(endMetadata);
Set<Snapshot> diffSnapshots =
getDiffSnapshotIds(startMetadata, rewriteVersionResult.toRewrite());
Set<Snapshot> deltaSnapshots = deltaSnapshots(startMetadata, rewriteVersionResult.toRewrite());

Set<String> manifestsToRewrite = manifestsToRewrite(diffSnapshots, startMetadata);
Set<String> manifestsToRewrite = manifestsToRewrite(deltaSnapshots, startMetadata);
Set<Snapshot> validSnapshots =
Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata));

Expand All @@ -287,11 +283,16 @@ private String rebuildMetadata() {
.reduce(new RewriteResult<>(), RewriteResult::append);

// rebuild manifest files
RewriteResult<DeleteFile> rewriteManifestResult =
RewriteContentFileResult rewriteManifestResult =
rewriteManifests(endMetadata, rewriteManifestListResult.toRewrite());

// rebuild position delete files
rewritePositionDeletes(endMetadata, rewriteManifestResult.toRewrite());
Set<DeleteFile> deleteFiles =
rewriteManifestResult.toRewrite().stream()
.filter(e -> e instanceof DeleteFile)
.map(e -> (DeleteFile) e)
.collect(Collectors.toSet());
rewritePositionDeletes(endMetadata, deleteFiles);

Set<Pair<String, String>> copyPlan = Sets.newHashSet();
copyPlan.addAll(rewriteVersionResult.copyPlan());
Expand All @@ -318,8 +319,7 @@ private String saveFileList(Set<Pair<String, String>> filesToMove) {
return fileListPath;
}

private Set<Snapshot> getDiffSnapshotIds(
TableMetadata startMetadata, Set<Snapshot> allSnapshots) {
private Set<Snapshot> deltaSnapshots(TableMetadata startMetadata, Set<Snapshot> allSnapshots) {
if (startMetadata == null) {
return allSnapshots;
} else {
Expand Down Expand Up @@ -396,44 +396,59 @@ private RewriteResult<ManifestFile> rewriteManifestList(
return result;
}

private Set<String> manifestsToRewrite(Set<Snapshot> diffSnapshots, TableMetadata startMetadata) {
private Set<String> manifestsToRewrite(
Set<Snapshot> deltaSnapshots, TableMetadata startMetadata) {
try {
Table endStaticTable = newStaticTable(endVersionName, table.io());
Dataset<Row> lastVersionFiles = manifestDS(endStaticTable).select("path");
if (startMetadata == null) {
return Sets.newHashSet(lastVersionFiles.distinct().as(Encoders.STRING()).collectAsList());
} else {
Set<Long> diffSnapshotIds =
diffSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
Set<Long> deltaSnapshotIds =
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
return Sets.newHashSet(
lastVersionFiles
.distinct()
.filter(functions.column("added_snapshot_id").isInCollection(diffSnapshotIds))
.filter(
functions
.column(ManifestFile.SNAPSHOT_ID.name())
.isInCollection(deltaSnapshotIds))
.as(Encoders.STRING())
.collectAsList());
}
} catch (Exception e) {
throw new UnsupportedOperationException(
"Failed to build the manifest files dataframe, the end version you are "
+ "trying to copy may contain invalid snapshots, please a younger version that doesn't have invalid "
+ "snapshots",
"Unable to build the manifest files dataframe. The end version in use may contain invalid snapshots. "
+ "Please choose an earlier version without invalid snapshots.",
e);
}
}

public static class RewriteDeleteFileResult extends RewriteResult<DeleteFile> {
public RewriteDeleteFileResult append(RewriteDeleteFileResult r1) {
public static class RewriteContentFileResult extends RewriteResult<ContentFile<?>> {
public RewriteContentFileResult append(RewriteResult<ContentFile<?>> r1) {
this.copyPlan().addAll(r1.copyPlan());
this.toRewrite().addAll(r1.toRewrite());
return this;
}

public RewriteContentFileResult appendDataFile(RewriteResult<DataFile> r1) {
this.copyPlan().addAll(r1.copyPlan());
this.toRewrite().addAll(r1.toRewrite());
return this;
}

public RewriteContentFileResult appendDeleteFile(RewriteResult<DeleteFile> r1) {
this.copyPlan().addAll(r1.copyPlan());
this.toRewrite().addAll(r1.toRewrite());
return this;
}
}

/** Rewrite manifest files in a distributed manner and return rewritten data files path pairs. */
private RewriteResult<DeleteFile> rewriteManifests(
private RewriteContentFileResult rewriteManifests(
TableMetadata tableMetadata, Set<ManifestFile> toRewrite) {
if (toRewrite.isEmpty()) {
return new RewriteResult<>();
return new RewriteContentFileResult();
}

Encoder<ManifestFile> manifestFileEncoder = Encoders.javaSerialization(ManifestFile.class);
Expand All @@ -454,13 +469,13 @@ private RewriteResult<DeleteFile> rewriteManifests(
specsById,
sourcePrefix,
targetPrefix),
Encoders.bean(RewriteDeleteFileResult.class))
Encoders.bean(RewriteContentFileResult.class))
// duplicates are expected here as the same data file can have different statuses
// (e.g. added and deleted)
.reduce((ReduceFunction<RewriteDeleteFileResult>) RewriteDeleteFileResult::append);
.reduce((ReduceFunction<RewriteContentFileResult>) RewriteContentFileResult::append);
}

private static MapFunction<ManifestFile, RewriteDeleteFileResult> toManifests(
private static MapFunction<ManifestFile, RewriteContentFileResult> toManifests(
Broadcast<Table> tableBroadcast,
String stagingLocation,
int format,
Expand All @@ -469,23 +484,21 @@ private static MapFunction<ManifestFile, RewriteDeleteFileResult> toManifests(
String targetPrefix) {

return manifestFile -> {
RewriteDeleteFileResult result = new RewriteDeleteFileResult();
RewriteContentFileResult result = new RewriteContentFileResult();
switch (manifestFile.content()) {
case DATA:
result
.copyPlan()
.addAll(
writeDataManifest(
manifestFile,
tableBroadcast,
stagingLocation,
format,
specsById,
sourcePrefix,
targetPrefix));
result.appendDataFile(
writeDataManifest(
manifestFile,
tableBroadcast,
stagingLocation,
format,
specsById,
sourcePrefix,
targetPrefix));
break;
case DELETES:
result.append(
result.appendDeleteFile(
writeDeleteManifest(
manifestFile,
tableBroadcast,
Expand All @@ -503,7 +516,7 @@ private static MapFunction<ManifestFile, RewriteDeleteFileResult> toManifests(
};
}

private static List<Pair<String, String>> writeDataManifest(
private static RewriteResult<DataFile> writeDataManifest(
ManifestFile manifestFile,
Broadcast<Table> tableBroadcast,
String stagingLocation,
Expand All @@ -516,10 +529,8 @@ private static List<Pair<String, String>> writeDataManifest(
FileIO io = tableBroadcast.getValue().io();
OutputFile outputFile = io.newOutputFile(stagingPath);
Map<Integer, PartitionSpec> specsById = specsByIdBroadcast.getValue();

return new ArrayList<>(
RewriteTablePathUtil.rewriteManifest(
manifestFile, outputFile, io, format, specsById, sourcePrefix, targetPrefix));
return RewriteTablePathUtil.rewriteDataManifest(
manifestFile, outputFile, io, format, specsById, sourcePrefix, targetPrefix);
} catch (IOException e) {
throw new RuntimeIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,25 +181,18 @@ public void testRewritePath() throws Exception {
.select("file_path")
.as(Encoders.STRING())
.collectAsList();
assertThat(validDataFilesAfterRebuilt.size()).isEqualTo(2);
for (String item : validDataFilesAfterRebuilt) {
assertThat(item).startsWith(targetTableLocation);
}
assertThat(validDataFilesAfterRebuilt)
.hasSize(2)
.allMatch(item -> item.startsWith(targetTableLocation));

// verify data rows
Dataset<Row> resultDF = spark.read().format("iceberg").load(targetTableLocation);
List<ThreeColumnRecord> actualRecords =
resultDF.sort("c1", "c2", "c3").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();

List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.add(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
expectedRecords.add(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));

assertThat(expectedRecords).isEqualTo(actualRecords);
List<Object[]> actual = rows(targetTableLocation);
List<Object[]> expected = rows(tableLocation);
assertEquals("Rows should match after copy", expected, actual);
}

@Test
public void testSameLocations() throws Exception {
public void testSameLocations() {
assertThatThrownBy(
() ->
actions()
Expand Down Expand Up @@ -546,9 +539,8 @@ public void testMoveVersionWithInvalidSnapshots() throws Exception {
.execute())
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining(
"Failed to build the manifest files dataframe, "
+ "the end version you are trying to copy may contain invalid snapshots, "
+ "please a younger version that doesn't have invalid snapshots");
"Unable to build the manifest files dataframe. The end version in use may contain invalid snapshots. "
+ "Please choose an earlier version without invalid snapshots.");
}

@Test
Expand Down Expand Up @@ -1000,4 +992,8 @@ private static String fileName(String path) {
private TableMetadata currentMetadata(Table tbl) {
return ((HasTableOperations) tbl).operations().current();
}

private List<Object[]> rows(String location) {
return rowsToJava(spark.read().format("iceberg").load(location).collectAsList());
}
}

0 comments on commit 33d53ce

Please sign in to comment.