Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.2: Don't cache or reuse manifest entries while rewriting metadata by default #8956

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
Expand All @@ -65,7 +64,6 @@
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -82,7 +80,7 @@ public class RewriteManifestsSparkAction
extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> implements RewriteManifests {

public static final String USE_CACHING = "use-caching";
public static final boolean USE_CACHING_DEFAULT = true;
public static final boolean USE_CACHING_DEFAULT = false;

private static final Logger LOG = LoggerFactory.getLogger(RewriteManifestsSparkAction.class);

Expand Down Expand Up @@ -277,16 +275,9 @@ private List<ManifestFile> writeManifestsForPartitionedTable(
}

private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) {
Dataset<T> reusableDS;
boolean useCaching =
PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT);
if (useCaching) {
reusableDS = ds.cache();
} else {
int parallelism = SQLConf.get().numShufflePartitions();
reusableDS =
ds.repartition(parallelism).map((MapFunction<T, T>) value -> value, ds.exprEnc());
}
Dataset<T> reusableDS = useCaching ? ds.cache() : ds;

try {
return func.apply(reusableDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,25 @@ public class TestRewriteManifestsAction extends SparkTestBase {
optional(2, "c2", Types.StringType.get()),
optional(3, "c3", Types.StringType.get()));

@Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}")
@Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}, useCaching = {1}")
public static Object[] parameters() {
return new Object[] {"true", "false"};
return new Object[][] {
new Object[] {"true", "true"},
new Object[] {"false", "true"},
new Object[] {"true", "false"},
new Object[] {"false", "false"}
};
}

@Rule public TemporaryFolder temp = new TemporaryFolder();

private final String snapshotIdInheritanceEnabled;
private final String useCaching;
private String tableLocation = null;

public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled) {
public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled, String useCaching) {
this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
this.useCaching = useCaching;
}

@Before
Expand All @@ -109,6 +116,7 @@ public void testRewriteManifestsEmptyTable() throws IOException {
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.execute();

Expand Down Expand Up @@ -141,7 +149,11 @@ public void testRewriteSmallManifestsNonPartitionedTable() {
SparkActions actions = SparkActions.get();

RewriteManifests.Result result =
actions.rewriteManifests(table).rewriteIf(manifest -> true).execute();
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();

Assert.assertEquals(
"Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests()));
Expand Down Expand Up @@ -281,7 +293,11 @@ public void testRewriteSmallManifestsPartitionedTable() {
.commit();

RewriteManifests.Result result =
actions.rewriteManifests(table).rewriteIf(manifest -> true).execute();
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();

Assert.assertEquals(
"Action should rewrite 4 manifests", 4, Iterables.size(result.rewrittenManifests()));
Expand Down Expand Up @@ -354,6 +370,7 @@ public void testRewriteImportedManifests() throws IOException {
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.execute();

Expand Down Expand Up @@ -404,6 +421,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException {
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.execute();

Expand Down Expand Up @@ -451,7 +469,7 @@ public void testRewriteManifestsWithPredicate() throws IOException {

SparkActions actions = SparkActions.get();

// rewrite only the first manifest without caching
// rewrite only the first manifest
RewriteManifests.Result result =
actions
.rewriteManifests(table)
Expand All @@ -460,7 +478,7 @@ public void testRewriteManifestsWithPredicate() throws IOException {
(manifest.path().equals(manifests.get(0).path())
|| (manifest.path().equals(manifests.get(1).path()))))
.stagingLocation(temp.newFolder().toString())
.option("use-caching", "false")
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();

Assert.assertEquals(
Expand Down Expand Up @@ -519,7 +537,11 @@ public void testRewriteSmallManifestsNonPartitionedV2Table() {
Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());

SparkActions actions = SparkActions.get();
RewriteManifests.Result result = actions.rewriteManifests(table).execute();
RewriteManifests.Result result =
actions
.rewriteManifests(table)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();
Assert.assertEquals(
"Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests()));
Assert.assertEquals(
Expand Down