From fceea89cb4a8f781641aa65456801b1cd40d0d03 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Mon, 30 Oct 2023 10:53:23 -0700 Subject: [PATCH] Spark 3.5: Don't cache or reuse manifest entries while rewriting metadata by default (#8935) The action for rewriting manifests caches the manifest entry DF or does an extra shuffle in order to skip reading the actual manifest files twice. We did this assuming it would increase the performance. However, the caching seems to perform poorly for larger tables as it requires substantial cluster resources. In addition, doing a round-robin repartition is expensive as the entries must be written to disk. The extra write is actually more expensive than the extra read required for the range-based shuffle of manifest entries. Therefore, this change disables caching by default and removes the optional round-robin repartition step. Instead, we will read the manifests twice (this step is distributed and scales really well even for tables with huge metadata). The new approach should be both faster and more robust. --- .../actions/RewriteManifestsSparkAction.java | 13 +------ .../actions/TestRewriteManifestsAction.java | 38 +++++++++++++++---- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 06a5c8c5720f..c0b6d3fe1712 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -56,7 +56,6 @@ import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; -import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapPartitionsFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.Column; @@ -65,7 +64,6 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +80,7 @@ public class RewriteManifestsSparkAction extends BaseSnapshotUpdateSparkAction implements RewriteManifests { public static final String USE_CACHING = "use-caching"; - public static final boolean USE_CACHING_DEFAULT = true; + public static final boolean USE_CACHING_DEFAULT = false; private static final Logger LOG = LoggerFactory.getLogger(RewriteManifestsSparkAction.class); @@ -277,16 +275,9 @@ private List writeManifestsForPartitionedTable( } private U withReusableDS(Dataset ds, Function, U> func) { - Dataset reusableDS; boolean useCaching = PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT); - if (useCaching) { - reusableDS = ds.cache(); - } else { - int parallelism = SQLConf.get().numShufflePartitions(); - reusableDS = - ds.repartition(parallelism).map((MapFunction) value -> value, ds.exprEnc()); - } + Dataset reusableDS = useCaching ? ds.cache() : ds; try { return func.apply(reusableDS); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 64dbf42d4c8e..a2a8ec8a9fca 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -75,18 +75,25 @@ public class TestRewriteManifestsAction extends SparkTestBase { optional(2, "c2", Types.StringType.get()), optional(3, "c3", Types.StringType.get())); - @Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}") + @Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}, useCaching = {1}") public static Object[] parameters() { - return new Object[] {"true", "false"}; + return new Object[][] { + new Object[] {"true", "true"}, + new Object[] {"false", "true"}, + new Object[] {"true", "false"}, + new Object[] {"false", "false"} + }; } @Rule public TemporaryFolder temp = new TemporaryFolder(); private final String snapshotIdInheritanceEnabled; + private final String useCaching; private String tableLocation = null; - public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled) { + public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled, String useCaching) { this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled; + this.useCaching = useCaching; } @Before @@ -109,6 +116,7 @@ public void testRewriteManifestsEmptyTable() throws IOException { actions .rewriteManifests(table) .rewriteIf(manifest -> true) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) .stagingLocation(temp.newFolder().toString()) .execute(); @@ -141,7 +149,11 @@ public void testRewriteSmallManifestsNonPartitionedTable() { SparkActions actions = SparkActions.get(); RewriteManifests.Result result = - actions.rewriteManifests(table).rewriteIf(manifest -> true).execute(); + actions + .rewriteManifests(table) + .rewriteIf(manifest -> true) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) + .execute(); Assert.assertEquals( "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); @@ -281,7 +293,11 @@ public void testRewriteSmallManifestsPartitionedTable() { .commit(); RewriteManifests.Result result = - actions.rewriteManifests(table).rewriteIf(manifest -> true).execute(); + actions + .rewriteManifests(table) + .rewriteIf(manifest -> true) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) + .execute(); Assert.assertEquals( "Action should rewrite 4 manifests", 4, Iterables.size(result.rewrittenManifests())); @@ -354,6 +370,7 @@ public void testRewriteImportedManifests() throws IOException { actions .rewriteManifests(table) .rewriteIf(manifest -> true) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) .stagingLocation(temp.newFolder().toString()) .execute(); @@ -404,6 +421,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException { actions .rewriteManifests(table) .rewriteIf(manifest -> true) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) .stagingLocation(temp.newFolder().toString()) .execute(); @@ -451,7 +469,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { SparkActions actions = SparkActions.get(); - // rewrite only the first manifest without caching + // rewrite only the first manifest RewriteManifests.Result result = actions .rewriteManifests(table) @@ -460,7 +478,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { (manifest.path().equals(manifests.get(0).path()) || (manifest.path().equals(manifests.get(1).path())))) .stagingLocation(temp.newFolder().toString()) - .option("use-caching", "false") + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) .execute(); Assert.assertEquals( @@ -519,7 +537,11 @@ public void testRewriteSmallManifestsNonPartitionedV2Table() { Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size()); SparkActions actions = SparkActions.get(); - RewriteManifests.Result result = actions.rewriteManifests(table).execute(); + RewriteManifests.Result result = + actions + .rewriteManifests(table) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) + .execute(); Assert.assertEquals( "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals(