From 9a0191e8de151864f27de00b30d6d1bae634a534 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Thu, 1 Feb 2024 09:36:42 -0800 Subject: [PATCH] Spark 3.4: Fix rewriting manifests for evolved unpartitioned V1 tables (#9599) This change backports #9015 to Spark 3.4. --- .../actions/RewriteManifestsSparkAction.java | 2 +- .../actions/TestRewriteManifestsAction.java | 63 ++++++++++++++++++- 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index af442ec300bc..0c08324a1a84 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -179,7 +179,7 @@ private RewriteManifests.Result doExecute() { Dataset manifestEntryDF = buildManifestEntryDF(matchingManifests); List newManifests; - if (spec.fields().size() < 1) { + if (spec.isUnpartitioned()) { newManifests = writeManifestsForUnpartitionedTable(manifestEntryDF, targetNumManifests); } else { newManifests = writeManifestsForPartitionedTable(manifestEntryDF, targetNumManifests); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 9ae1954d68bb..d3932c2e82a8 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -44,8 +44,10 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.actions.RewriteManifests; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.hadoop.HadoopTables; @@ -598,6 +600,55 @@ public void testRewriteSmallManifestsNonPartitionedV2Table() { Assert.assertEquals("Rows must match", expectedRecords, actualRecords); } + @Test + public void testRewriteLargeManifestsEvolvedUnpartitionedV1Table() throws IOException { + assumeThat(formatVersion).isEqualTo(1); + + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build(); + Map options = Maps.newHashMap(); + options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + table.updateSpec().removeField("c3").commit(); + + assertThat(table.spec().fields()).hasSize(1).allMatch(field -> field.transform().isVoid()); + + List dataFiles = Lists.newArrayList(); + for (int fileOrdinal = 0; fileOrdinal < 1000; fileOrdinal++) { + dataFiles.add(newDataFile(table, TestHelpers.Row.of(new Object[] {null}))); + } + ManifestFile appendManifest = writeManifest(table, dataFiles); + table.newFastAppend().appendManifest(appendManifest).commit(); + + List originalManifests = table.currentSnapshot().allManifests(table.io()); + ManifestFile originalManifest = Iterables.getOnlyElement(originalManifests); + + // set the target manifest size to a small value to force splitting records into multiple files + table + .updateProperties() + .set( + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + String.valueOf(originalManifest.length() / 2)) + .commit(); + + SparkActions actions = SparkActions.get(); + + RewriteManifests.Result result = + actions + .rewriteManifests(table) + .rewriteIf(manifest -> true) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) + .execute(); + + assertThat(result.rewrittenManifests()).hasSize(1); + assertThat(result.addedManifests()).hasSizeGreaterThanOrEqualTo(2); + assertManifestsLocation(result.addedManifests()); + + List manifests = table.currentSnapshot().allManifests(table.io()); + assertThat(manifests).hasSizeGreaterThanOrEqualTo(2); + } + private void writeRecords(List records) { Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class); writeDF(df); @@ -657,11 +708,17 @@ private ManifestFile writeManifest(Table table, List files) throws IOE } private DataFile newDataFile(Table table, String partitionPath) { + return newDataFileBuilder(table).withPartitionPath(partitionPath).build(); + } + + private DataFile newDataFile(Table table, StructLike partition) { + return newDataFileBuilder(table).withPartition(partition).build(); + } + + private DataFiles.Builder newDataFileBuilder(Table table) { return DataFiles.builder(table.spec()) .withPath("/path/to/data-" + UUID.randomUUID() + ".parquet") .withFileSizeInBytes(10) - .withPartitionPath(partitionPath) - .withRecordCount(1) - .build(); + .withRecordCount(1); } }