Skip to content

Commit

Permalink
Spark 3.4: Fix rewriting manifests for evolved unpartitioned V1 tables (
Browse files Browse the repository at this point in the history
#9599)

This change backports #9015 to Spark 3.4.
  • Loading branch information
aokolnychyi authored Feb 1, 2024
1 parent e8c197d commit 9a0191e
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private RewriteManifests.Result doExecute() {
Dataset<Row> manifestEntryDF = buildManifestEntryDF(matchingManifests);

List<ManifestFile> newManifests;
if (spec.fields().size() < 1) {
if (spec.isUnpartitioned()) {
newManifests = writeManifestsForUnpartitionedTable(manifestEntryDF, targetNumManifests);
} else {
newManifests = writeManifestsForPartitionedTable(manifestEntryDF, targetNumManifests);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
Expand Down Expand Up @@ -598,6 +600,55 @@ public void testRewriteSmallManifestsNonPartitionedV2Table() {
Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
}

@Test
public void testRewriteLargeManifestsEvolvedUnpartitionedV1Table() throws IOException {
assumeThat(formatVersion).isEqualTo(1);

PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

table.updateSpec().removeField("c3").commit();

assertThat(table.spec().fields()).hasSize(1).allMatch(field -> field.transform().isVoid());

List<DataFile> dataFiles = Lists.newArrayList();
for (int fileOrdinal = 0; fileOrdinal < 1000; fileOrdinal++) {
dataFiles.add(newDataFile(table, TestHelpers.Row.of(new Object[] {null})));
}
ManifestFile appendManifest = writeManifest(table, dataFiles);
table.newFastAppend().appendManifest(appendManifest).commit();

List<ManifestFile> originalManifests = table.currentSnapshot().allManifests(table.io());
ManifestFile originalManifest = Iterables.getOnlyElement(originalManifests);

// set the target manifest size to a small value to force splitting records into multiple files
table
.updateProperties()
.set(
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
String.valueOf(originalManifest.length() / 2))
.commit();

SparkActions actions = SparkActions.get();

RewriteManifests.Result result =
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();

assertThat(result.rewrittenManifests()).hasSize(1);
assertThat(result.addedManifests()).hasSizeGreaterThanOrEqualTo(2);
assertManifestsLocation(result.addedManifests());

List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
assertThat(manifests).hasSizeGreaterThanOrEqualTo(2);
}

private void writeRecords(List<ThreeColumnRecord> records) {
Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
writeDF(df);
Expand Down Expand Up @@ -657,11 +708,17 @@ private ManifestFile writeManifest(Table table, List<DataFile> files) throws IOE
}

private DataFile newDataFile(Table table, String partitionPath) {
return newDataFileBuilder(table).withPartitionPath(partitionPath).build();
}

private DataFile newDataFile(Table table, StructLike partition) {
return newDataFileBuilder(table).withPartition(partition).build();
}

private DataFiles.Builder newDataFileBuilder(Table table) {
return DataFiles.builder(table.spec())
.withPath("/path/to/data-" + UUID.randomUUID() + ".parquet")
.withFileSizeInBytes(10)
.withPartitionPath(partitionPath)
.withRecordCount(1)
.build();
.withRecordCount(1);
}
}

0 comments on commit 9a0191e

Please sign in to comment.