Skip to content

Commit

Permalink
Spark 3.5: Use rolling manifest writers when optimizing metadata (#8972)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Nov 4, 2023
1 parent 2c89010 commit b0bf62a
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

import static org.apache.iceberg.MetadataTableType.ENTRIES;

import java.io.IOException;
import java.util.Collections;
import java.io.Serializable;
import java.util.List;
import java.util.UUID;
import java.util.function.Function;
Expand All @@ -36,6 +35,7 @@
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.RollingManifestWriter;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
Expand Down Expand Up @@ -167,20 +167,7 @@ private RewriteManifests.Result doExecute() {
.build();
}

long totalSizeBytes = 0L;
int numEntries = 0;

for (ManifestFile manifest : matchingManifests) {
ValidationException.check(
hasFileCounts(manifest), "No file counts in manifest: %s", manifest.path());

totalSizeBytes += manifest.length();
numEntries +=
manifest.addedFilesCount() + manifest.existingFilesCount() + manifest.deletedFilesCount();
}

int targetNumManifests = targetNumManifests(totalSizeBytes);
int targetNumManifestEntries = targetNumManifestEntries(numEntries, targetNumManifests);
int targetNumManifests = targetNumManifests(totalSizeBytes(matchingManifests));

if (targetNumManifests == 1 && matchingManifests.size() == 1) {
return ImmutableRewriteManifests.Result.builder()
Expand All @@ -195,9 +182,7 @@ private RewriteManifests.Result doExecute() {
if (spec.fields().size() < 1) {
newManifests = writeManifestsForUnpartitionedTable(manifestEntryDF, targetNumManifests);
} else {
newManifests =
writeManifestsForPartitionedTable(
manifestEntryDF, targetNumManifests, targetNumManifestEntries);
newManifests = writeManifestsForPartitionedTable(manifestEntryDF, targetNumManifests);
}

replaceManifests(matchingManifests, newManifests);
Expand Down Expand Up @@ -233,41 +218,24 @@ private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {
private List<ManifestFile> writeManifestsForUnpartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests) {

Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);

// we rely only on the target number of manifests for unpartitioned tables
// as we should not worry about having too much metadata per partition
long maxNumManifestEntries = Long.MAX_VALUE;
Types.StructType partitionType = spec.partitionType();

return manifestEntryDF
.repartition(numManifests)
.mapPartitions(
toManifests(
tableBroadcast,
maxNumManifestEntries,
outputLocation,
formatVersion,
combinedPartitionType,
spec,
sparkType),
toManifests(manifestWriters(), combinedPartitionType, partitionType, sparkType),
manifestEncoder)
.collectAsList();
}

private List<ManifestFile> writeManifestsForPartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests, int targetNumManifestEntries) {
Dataset<Row> manifestEntryDF, int numManifests) {

Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);

// we allow the actual size of manifests to be 10% higher if the estimation is not precise
// enough
long maxNumManifestEntries = (long) (1.1 * targetNumManifestEntries);
Types.StructType partitionType = spec.partitionType();

return withReusableDS(
manifestEntryDF,
Expand All @@ -276,14 +244,7 @@ private List<ManifestFile> writeManifestsForPartitionedTable(
return df.repartitionByRange(numManifests, partitionColumn)
.sortWithinPartitions(partitionColumn)
.mapPartitions(
toManifests(
tableBroadcast,
maxNumManifestEntries,
outputLocation,
formatVersion,
combinedPartitionType,
spec,
sparkType),
toManifests(manifestWriters(), combinedPartitionType, partitionType, sparkType),
manifestEncoder)
.collectAsList();
});
Expand Down Expand Up @@ -319,8 +280,16 @@ private int targetNumManifests(long totalSizeBytes) {
return (int) ((totalSizeBytes + targetManifestSizeBytes - 1) / targetManifestSizeBytes);
}

private int targetNumManifestEntries(int numEntries, int numManifests) {
return (numEntries + numManifests - 1) / numManifests;
private long totalSizeBytes(Iterable<ManifestFile> manifests) {
long totalSizeBytes = 0L;

for (ManifestFile manifest : manifests) {
ValidationException.check(
hasFileCounts(manifest), "No file counts in manifest: %s", manifest.path());
totalSizeBytes += manifest.length();
}

return totalSizeBytes;
}

private boolean hasFileCounts(ManifestFile manifest) {
Expand Down Expand Up @@ -360,104 +329,90 @@ private void deleteFiles(Iterable<String> locations) {
.run(location -> table.io().deleteFile(location));
}

private static ManifestFile writeManifest(
List<Row> rows,
int startIndex,
int endIndex,
Broadcast<Table> tableBroadcast,
String location,
int format,
Types.StructType combinedPartitionType,
PartitionSpec spec,
StructType sparkType)
throws IOException {
private ManifestWriterFactory manifestWriters() {
return new ManifestWriterFactory(
sparkContext().broadcast(SerializableTableWithSize.copyOf(table)),
formatVersion,
spec.specId(),
outputLocation,
// allow the actual size of manifests to be 20% higher as the estimation is not precise
(long) (1.2 * targetManifestSizeBytes));
}

String manifestName = "optimized-m-" + UUID.randomUUID();
Path manifestPath = new Path(location, manifestName);
OutputFile outputFile =
tableBroadcast
.value()
.io()
.newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
private static MapPartitionsFunction<Row, ManifestFile> toManifests(
ManifestWriterFactory writers,
Types.StructType combinedPartitionType,
Types.StructType partitionType,
StructType sparkType) {

Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);
return rows -> {
Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
Types.StructType manifestFileType = DataFile.getType(partitionType);
SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);

RollingManifestWriter<DataFile> writer = writers.newRollingManifestWriter();

try {
while (rows.hasNext()) {
Row row = rows.next();
long snapshotId = row.getLong(0);
long sequenceNumber = row.getLong(1);
Long fileSequenceNumber = row.isNullAt(2) ? null : row.getLong(2);
Row file = row.getStruct(3);
writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber, fileSequenceNumber);
}
} finally {
writer.close();
}

ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
return writer.toManifestFiles().iterator();
};
}

try {
for (int index = startIndex; index < endIndex; index++) {
Row row = rows.get(index);
long snapshotId = row.getLong(0);
long sequenceNumber = row.getLong(1);
Long fileSequenceNumber = row.isNullAt(2) ? null : row.getLong(2);
Row file = row.getStruct(3);
writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber, fileSequenceNumber);
}
} finally {
writer.close();
private static class ManifestWriterFactory implements Serializable {
private final Broadcast<Table> tableBroadcast;
private final int formatVersion;
private final int specId;
private final String outputLocation;
private final long maxManifestSizeBytes;

ManifestWriterFactory(
Broadcast<Table> tableBroadcast,
int formatVersion,
int specId,
String outputLocation,
long maxManifestSizeBytes) {
this.tableBroadcast = tableBroadcast;
this.formatVersion = formatVersion;
this.specId = specId;
this.outputLocation = outputLocation;
this.maxManifestSizeBytes = maxManifestSizeBytes;
}

return writer.toManifestFile();
}
public RollingManifestWriter<DataFile> newRollingManifestWriter() {
return new RollingManifestWriter<>(this::newManifestWriter, maxManifestSizeBytes);
}

private static MapPartitionsFunction<Row, ManifestFile> toManifests(
Broadcast<Table> tableBroadcast,
long maxNumManifestEntries,
String location,
int format,
Types.StructType combinedPartitionType,
PartitionSpec spec,
StructType sparkType) {
private ManifestWriter<DataFile> newManifestWriter() {
return ManifestFiles.write(formatVersion, spec(), newOutputFile(), null);
}

return rows -> {
List<Row> rowsAsList = Lists.newArrayList(rows);
private PartitionSpec spec() {
return table().specs().get(specId);
}

if (rowsAsList.isEmpty()) {
return Collections.emptyIterator();
}
private OutputFile newOutputFile() {
return table().io().newOutputFile(newManifestLocation());
}

List<ManifestFile> manifests = Lists.newArrayList();
if (rowsAsList.size() <= maxNumManifestEntries) {
manifests.add(
writeManifest(
rowsAsList,
0,
rowsAsList.size(),
tableBroadcast,
location,
format,
combinedPartitionType,
spec,
sparkType));
} else {
int midIndex = rowsAsList.size() / 2;
manifests.add(
writeManifest(
rowsAsList,
0,
midIndex,
tableBroadcast,
location,
format,
combinedPartitionType,
spec,
sparkType));
manifests.add(
writeManifest(
rowsAsList,
midIndex,
rowsAsList.size(),
tableBroadcast,
location,
format,
combinedPartitionType,
spec,
sparkType));
}
private String newManifestLocation() {
String fileName = FileFormat.AVRO.addExtension("optimized-m-" + UUID.randomUUID());
Path filePath = new Path(outputLocation, fileName);
return filePath.toString();
}

return manifests.iterator();
};
private Table table() {
return tableBroadcast.value();
}
}
}
Loading

0 comments on commit b0bf62a

Please sign in to comment.