diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 06a5c8c5720f..c0b6d3fe1712 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -56,7 +56,6 @@ import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; -import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapPartitionsFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.Column; @@ -65,7 +64,6 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +80,7 @@ public class RewriteManifestsSparkAction extends BaseSnapshotUpdateSparkAction implements RewriteManifests { public static final String USE_CACHING = "use-caching"; - public static final boolean USE_CACHING_DEFAULT = true; + public static final boolean USE_CACHING_DEFAULT = false; private static final Logger LOG = LoggerFactory.getLogger(RewriteManifestsSparkAction.class); @@ -277,16 +275,9 @@ private List writeManifestsForPartitionedTable( } private U withReusableDS(Dataset ds, Function, U> func) { - Dataset reusableDS; boolean useCaching = PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT); - if (useCaching) { - reusableDS = ds.cache(); - } else { - int parallelism = SQLConf.get().numShufflePartitions(); - reusableDS = - ds.repartition(parallelism).map((MapFunction) value -> value, ds.exprEnc()); - } + Dataset reusableDS = useCaching ? ds.cache() : ds; try { return func.apply(reusableDS); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 64dbf42d4c8e..a2a8ec8a9fca 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -75,18 +75,25 @@ public class TestRewriteManifestsAction extends SparkTestBase { optional(2, "c2", Types.StringType.get()), optional(3, "c3", Types.StringType.get())); - @Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}") + @Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}, useCaching = {1}") public static Object[] parameters() { - return new Object[] {"true", "false"}; + return new Object[][] { + new Object[] {"true", "true"}, + new Object[] {"false", "true"}, + new Object[] {"true", "false"}, + new Object[] {"false", "false"} + }; } @Rule public TemporaryFolder temp = new TemporaryFolder(); private final String snapshotIdInheritanceEnabled; + private final String useCaching; private String tableLocation = null; - public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled) { + public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled, String useCaching) { this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled; + this.useCaching = useCaching; } @Before @@ -109,6 +116,7 @@ public void testRewriteManifestsEmptyTable() throws IOException { actions .rewriteManifests(table) .rewriteIf(manifest -> true) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) .stagingLocation(temp.newFolder().toString()) .execute(); @@ -141,7 +149,11 @@ public void testRewriteSmallManifestsNonPartitionedTable() { SparkActions actions = SparkActions.get(); RewriteManifests.Result result = - actions.rewriteManifests(table).rewriteIf(manifest -> true).execute(); + actions + .rewriteManifests(table) + .rewriteIf(manifest -> true) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) + .execute(); Assert.assertEquals( "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); @@ -281,7 +293,11 @@ public void testRewriteSmallManifestsPartitionedTable() { .commit(); RewriteManifests.Result result = - actions.rewriteManifests(table).rewriteIf(manifest -> true).execute(); + actions + .rewriteManifests(table) + .rewriteIf(manifest -> true) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) + .execute(); Assert.assertEquals( "Action should rewrite 4 manifests", 4, Iterables.size(result.rewrittenManifests())); @@ -354,6 +370,7 @@ public void testRewriteImportedManifests() throws IOException { actions .rewriteManifests(table) .rewriteIf(manifest -> true) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) .stagingLocation(temp.newFolder().toString()) .execute(); @@ -404,6 +421,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException { actions .rewriteManifests(table) .rewriteIf(manifest -> true) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) .stagingLocation(temp.newFolder().toString()) .execute(); @@ -451,7 +469,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { SparkActions actions = SparkActions.get(); - // rewrite only the first manifest without caching + // rewrite only the first manifest RewriteManifests.Result result = actions .rewriteManifests(table) @@ -460,7 +478,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { (manifest.path().equals(manifests.get(0).path()) || (manifest.path().equals(manifests.get(1).path())))) .stagingLocation(temp.newFolder().toString()) - .option("use-caching", "false") + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) .execute(); Assert.assertEquals( @@ -519,7 +537,11 @@ public void testRewriteSmallManifestsNonPartitionedV2Table() { Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size()); SparkActions actions = SparkActions.get(); - RewriteManifests.Result result = actions.rewriteManifests(table).execute(); + RewriteManifests.Result result = + actions + .rewriteManifests(table) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) + .execute(); Assert.assertEquals( "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals(