Skip to content

Commit

Permalink
Spark 3.3: Don't cache or reuse manifest entries while rewriting meta…
Browse files Browse the repository at this point in the history
…data by default
  • Loading branch information
aokolnychyi committed Oct 30, 2023
1 parent fceea89 commit 7b6d7da
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
Expand All @@ -65,7 +64,6 @@
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -82,7 +80,7 @@ public class RewriteManifestsSparkAction
extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> implements RewriteManifests {

public static final String USE_CACHING = "use-caching";
public static final boolean USE_CACHING_DEFAULT = true;
public static final boolean USE_CACHING_DEFAULT = false;

private static final Logger LOG = LoggerFactory.getLogger(RewriteManifestsSparkAction.class);

Expand Down Expand Up @@ -277,16 +275,9 @@ private List<ManifestFile> writeManifestsForPartitionedTable(
}

private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) {
Dataset<T> reusableDS;
boolean useCaching =
PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT);
if (useCaching) {
reusableDS = ds.cache();
} else {
int parallelism = SQLConf.get().numShufflePartitions();
reusableDS =
ds.repartition(parallelism).map((MapFunction<T, T>) value -> value, ds.exprEnc());
}
Dataset<T> reusableDS = useCaching ? ds.cache() : ds;

try {
return func.apply(reusableDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,25 @@ public class TestRewriteManifestsAction extends SparkTestBase {
optional(2, "c2", Types.StringType.get()),
optional(3, "c3", Types.StringType.get()));

@Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}")
@Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}, useCaching = {1}")
public static Object[] parameters() {
return new Object[] {"true", "false"};
return new Object[][] {
new Object[] {"true", "true"},
new Object[] {"false", "true"},
new Object[] {"true", "false"},
new Object[] {"false", "false"}
};
}

@Rule public TemporaryFolder temp = new TemporaryFolder();

private final String snapshotIdInheritanceEnabled;
private final String useCaching;
private String tableLocation = null;

public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled) {
public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled, String useCaching) {
this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
this.useCaching = useCaching;
}

@Before
Expand All @@ -109,6 +116,7 @@ public void testRewriteManifestsEmptyTable() throws IOException {
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.execute();

Expand Down Expand Up @@ -141,7 +149,11 @@ public void testRewriteSmallManifestsNonPartitionedTable() {
SparkActions actions = SparkActions.get();

RewriteManifests.Result result =
actions.rewriteManifests(table).rewriteIf(manifest -> true).execute();
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();

Assert.assertEquals(
"Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests()));
Expand Down Expand Up @@ -281,7 +293,11 @@ public void testRewriteSmallManifestsPartitionedTable() {
.commit();

RewriteManifests.Result result =
actions.rewriteManifests(table).rewriteIf(manifest -> true).execute();
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();

Assert.assertEquals(
"Action should rewrite 4 manifests", 4, Iterables.size(result.rewrittenManifests()));
Expand Down Expand Up @@ -354,6 +370,7 @@ public void testRewriteImportedManifests() throws IOException {
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.execute();

Expand Down Expand Up @@ -404,6 +421,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException {
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.execute();

Expand Down Expand Up @@ -451,7 +469,7 @@ public void testRewriteManifestsWithPredicate() throws IOException {

SparkActions actions = SparkActions.get();

// rewrite only the first manifest without caching
// rewrite only the first manifest
RewriteManifests.Result result =
actions
.rewriteManifests(table)
Expand All @@ -460,7 +478,7 @@ public void testRewriteManifestsWithPredicate() throws IOException {
(manifest.path().equals(manifests.get(0).path())
|| (manifest.path().equals(manifests.get(1).path()))))
.stagingLocation(temp.newFolder().toString())
.option("use-caching", "false")
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();

Assert.assertEquals(
Expand Down Expand Up @@ -519,7 +537,11 @@ public void testRewriteSmallManifestsNonPartitionedV2Table() {
Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());

SparkActions actions = SparkActions.get();
RewriteManifests.Result result = actions.rewriteManifests(table).execute();
RewriteManifests.Result result =
actions
.rewriteManifests(table)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();
Assert.assertEquals(
"Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests()));
Assert.assertEquals(
Expand Down

0 comments on commit 7b6d7da

Please sign in to comment.