Skip to content

Commit

Permalink
Spark 3.5: Don't cache or reuse manifest entries while rewriting meta…
Browse files Browse the repository at this point in the history
…data by default (#8935)

The action for rewriting manifests caches the manifest entry DF or does an extra shuffle in order to skip reading the actual manifest files twice. We did this assuming it would increase the performance. However, the caching seems to perform poorly for larger tables as it requires substantial cluster resources. In addition, doing a round-robin repartition is expensive as the entries must be written to disk. The extra write is actually more expensive than the extra read required for the range-based shuffle of manifest entries.

Therefore, this change disables caching by default and removes the optional round-robin repartition step. Instead, we will read the manifests twice (this step is distributed and scales really well even for tables with huge metadata). The new approach should be both faster and more robust.
  • Loading branch information
aokolnychyi authored Oct 30, 2023
1 parent a661663 commit fceea89
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
Expand All @@ -65,7 +64,6 @@
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -82,7 +80,7 @@ public class RewriteManifestsSparkAction
extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> implements RewriteManifests {

public static final String USE_CACHING = "use-caching";
public static final boolean USE_CACHING_DEFAULT = true;
public static final boolean USE_CACHING_DEFAULT = false;

private static final Logger LOG = LoggerFactory.getLogger(RewriteManifestsSparkAction.class);

Expand Down Expand Up @@ -277,16 +275,9 @@ private List<ManifestFile> writeManifestsForPartitionedTable(
}

private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) {
Dataset<T> reusableDS;
boolean useCaching =
PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT);
if (useCaching) {
reusableDS = ds.cache();
} else {
int parallelism = SQLConf.get().numShufflePartitions();
reusableDS =
ds.repartition(parallelism).map((MapFunction<T, T>) value -> value, ds.exprEnc());
}
Dataset<T> reusableDS = useCaching ? ds.cache() : ds;

try {
return func.apply(reusableDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,25 @@ public class TestRewriteManifestsAction extends SparkTestBase {
optional(2, "c2", Types.StringType.get()),
optional(3, "c3", Types.StringType.get()));

@Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}")
@Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}, useCaching = {1}")
public static Object[] parameters() {
return new Object[] {"true", "false"};
return new Object[][] {
new Object[] {"true", "true"},
new Object[] {"false", "true"},
new Object[] {"true", "false"},
new Object[] {"false", "false"}
};
}

@Rule public TemporaryFolder temp = new TemporaryFolder();

private final String snapshotIdInheritanceEnabled;
private final String useCaching;
private String tableLocation = null;

public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled) {
public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled, String useCaching) {
this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
this.useCaching = useCaching;
}

@Before
Expand All @@ -109,6 +116,7 @@ public void testRewriteManifestsEmptyTable() throws IOException {
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.execute();

Expand Down Expand Up @@ -141,7 +149,11 @@ public void testRewriteSmallManifestsNonPartitionedTable() {
SparkActions actions = SparkActions.get();

RewriteManifests.Result result =
actions.rewriteManifests(table).rewriteIf(manifest -> true).execute();
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();

Assert.assertEquals(
"Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests()));
Expand Down Expand Up @@ -281,7 +293,11 @@ public void testRewriteSmallManifestsPartitionedTable() {
.commit();

RewriteManifests.Result result =
actions.rewriteManifests(table).rewriteIf(manifest -> true).execute();
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();

Assert.assertEquals(
"Action should rewrite 4 manifests", 4, Iterables.size(result.rewrittenManifests()));
Expand Down Expand Up @@ -354,6 +370,7 @@ public void testRewriteImportedManifests() throws IOException {
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.execute();

Expand Down Expand Up @@ -404,6 +421,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException {
actions
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.stagingLocation(temp.newFolder().toString())
.execute();

Expand Down Expand Up @@ -451,7 +469,7 @@ public void testRewriteManifestsWithPredicate() throws IOException {

SparkActions actions = SparkActions.get();

// rewrite only the first manifest without caching
// rewrite only the first manifest
RewriteManifests.Result result =
actions
.rewriteManifests(table)
Expand All @@ -460,7 +478,7 @@ public void testRewriteManifestsWithPredicate() throws IOException {
(manifest.path().equals(manifests.get(0).path())
|| (manifest.path().equals(manifests.get(1).path()))))
.stagingLocation(temp.newFolder().toString())
.option("use-caching", "false")
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();

Assert.assertEquals(
Expand Down Expand Up @@ -519,7 +537,11 @@ public void testRewriteSmallManifestsNonPartitionedV2Table() {
Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());

SparkActions actions = SparkActions.get();
RewriteManifests.Result result = actions.rewriteManifests(table).execute();
RewriteManifests.Result result =
actions
.rewriteManifests(table)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();
Assert.assertEquals(
"Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests()));
Assert.assertEquals(
Expand Down

0 comments on commit fceea89

Please sign in to comment.