Skip to content

Commit

Permalink
More review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
szehon-ho committed Dec 7, 2024
1 parent 6880510 commit 36b3365
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 50 deletions.
24 changes: 17 additions & 7 deletions core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ private static RewriteResult<DeleteFile> writeDeleteFileEntry(
result.toRewrite().add(file);
return result;
case EQUALITY_DELETES:
DeleteFile eqDeleteFile = newEqualityDeleteRecord(file, spec, sourcePrefix, targetPrefix);
DeleteFile eqDeleteFile = newEqualityDeleteEntry(file, spec, sourcePrefix, targetPrefix);
appendEntryWithFile(entry, writer, eqDeleteFile);
// we do not need to recursively rewrite the equality delete, just move it
result.copyPlan().add(Pair.of(file.location(), eqDeleteFile.location()));
Expand Down Expand Up @@ -412,7 +412,7 @@ private static <F extends ContentFile<F>> void appendEntryWithFile(
}
}

private static DeleteFile newEqualityDeleteRecord(
private static DeleteFile newEqualityDeleteEntry(
DeleteFile file, PartitionSpec spec, String sourcePrefix, String targetPrefix) {
String path = file.location();

Expand Down Expand Up @@ -532,7 +532,8 @@ public static String newPath(String path, String sourcePrefix, String targetPref
return combinePaths(targetPrefix, relativize(path, sourcePrefix));
}

private static String combinePaths(String absolutePath, String relativePath) {
/** Combine a base and relative path. */
public static String combinePaths(String absolutePath, String relativePath) {
String combined = absolutePath;
if (!combined.endsWith("/")) {
combined += "/";
Expand All @@ -541,7 +542,8 @@ private static String combinePaths(String absolutePath, String relativePath) {
return combined;
}

private static String fileName(String path) {
/** Returns the file name of a path. */
public static String fileName(String path) {
String filename = path;
int lastIndex = path.lastIndexOf(File.separator);
if (lastIndex != -1) {
Expand All @@ -550,7 +552,8 @@ private static String fileName(String path) {
return filename;
}

private static String relativize(String path, String prefix) {
/** Relativize a path. */
public static String relativize(String path, String prefix) {
String toRemove = prefix;
if (!toRemove.endsWith("/")) {
toRemove += "/";
Expand All @@ -562,7 +565,14 @@ private static String relativize(String path, String prefix) {
return path.substring(toRemove.length());
}

private static String stagingPath(String originalPath, String stagingLocation) {
return stagingLocation + fileName(originalPath);
/**
* Construct a staging path under a given staging directory
*
* @param originalPath source path
* @param stagingDir staging directory
* @return a staging path under the staging directory, based on the original path
*/
public static String stagingPath(String originalPath, String stagingDir) {
return stagingDir + fileName(originalPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private Result doExecute() {
return ImmutableRewriteTablePath.Result.builder()
.stagingLocation(stagingDir)
.fileListLocation(resultLocation)
.latestVersion(fileName(endVersionName))
.latestVersion(RewriteTablePathUtil.fileName(endVersionName))
.build();
}

Expand Down Expand Up @@ -230,7 +230,7 @@ private String versionFile(TableMetadata metadata, String versionFileName) {
}

private boolean versionInFilePath(String path, String version) {
return fileName(path).equals(version);
return RewriteTablePathUtil.fileName(path).equals(version);
}

private String jobDesc() {
Expand Down Expand Up @@ -357,7 +357,7 @@ private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata endMetadata) {
}

private Pair<String, String> rewriteVersionFile(TableMetadata metadata, String versionFilePath) {
String stagingPath = stagingPath(versionFilePath, stagingDir);
String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, stagingDir);
TableMetadata newTableMetadata =
RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, targetPrefix);
TableMetadataParser.overwrite(newTableMetadata, table.io().newOutputFile(stagingPath));
Expand All @@ -378,7 +378,7 @@ private RewriteResult<ManifestFile> rewriteManifestList(
RewriteResult<ManifestFile> result = new RewriteResult<>();

String path = snapshot.manifestListLocation();
String outputPath = stagingPath(path, stagingDir);
String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir);
RewriteResult<ManifestFile> rewriteResult =
RewriteTablePathUtil.rewriteManifestList(
snapshot,
Expand Down Expand Up @@ -512,7 +512,7 @@ private static List<Pair<String, String>> writeDataManifest(
String sourcePrefix,
String targetPrefix) {
try {
String stagingPath = stagingPath(manifestFile.path(), stagingLocation);
String stagingPath = RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation);
FileIO io = tableBroadcast.getValue().io();
OutputFile outputFile = io.newOutputFile(stagingPath);
Map<Integer, PartitionSpec> specsById = specsByIdBroadcast.getValue();
Expand All @@ -534,7 +534,7 @@ private static RewriteResult<DeleteFile> writeDeleteManifest(
String sourcePrefix,
String targetPrefix) {
try {
String stagingPath = stagingPath(manifestFile.path(), stagingLocation);
String stagingPath = RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation);
FileIO io = tableBroadcast.getValue().io();
OutputFile outputFile = io.newOutputFile(stagingPath);
Map<Integer, PartitionSpec> specsById = specsByIdBroadcast.getValue();
Expand Down Expand Up @@ -606,11 +606,17 @@ private ForeachFunction<DeleteFile> rewritePositionDelete(
PositionDeleteReaderWriter posDeleteReaderWriter) {
return deleteFile -> {
FileIO io = tableBroadcast.getValue().io();
String newPath = stagingPath(deleteFile.location(), stagingLocationArg);
String newPath = RewriteTablePathUtil.stagingPath(deleteFile.location(), stagingLocationArg);
OutputFile outputFile = io.newOutputFile(newPath);
PartitionSpec spec = specsById.getValue().get(deleteFile.specId());
RewriteTablePathUtil.rewritePositionDeleteFile(
deleteFile, outputFile, io, spec, sourcePrefixArg, targetPrefixArg, posDeleteReaderWriter);
deleteFile,
outputFile,
io,
spec,
sourcePrefixArg,
targetPrefixArg,
posDeleteReaderWriter);
};
}

Expand Down Expand Up @@ -693,42 +699,9 @@ private boolean fileExist(String path) {
return table.io().newInputFile(path).exists();
}

private static String relativize(String path, String prefix) {
String toRemove = prefix;
if (!toRemove.endsWith("/")) {
toRemove += "/";
}
if (!path.startsWith(toRemove)) {
throw new IllegalArgumentException(
String.format("Path %s does not start with %s", path, toRemove));
}
return path.substring(toRemove.length());
}

private static String newPath(String path, String sourcePrefix, String targetPrefix) {
return combinePaths(targetPrefix, relativize(path, sourcePrefix));
}

private static String stagingPath(String originalPath, String stagingLocation) {
return stagingLocation + fileName(originalPath);
}

private static String combinePaths(String absolutePath, String relativePath) {
String combined = absolutePath;
if (!combined.endsWith("/")) {
combined += "/";
}
combined += relativePath;
return combined;
}

private static String fileName(String path) {
String filename = path;
int lastIndex = path.lastIndexOf(File.separator);
if (lastIndex != -1) {
filename = path.substring(lastIndex + 1);
}
return filename;
return RewriteTablePathUtil.combinePaths(
targetPrefix, RewriteTablePathUtil.relativize(path, sourcePrefix));
}

private String getMetadataLocation(Table tbl) {
Expand Down

0 comments on commit 36b3365

Please sign in to comment.