Skip to content

Commit

Permalink
Spark: Avoid extra copies of manifests while optimizing V2 tables
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Oct 27, 2023
1 parent 82e0a56 commit 820e6bb
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 21 deletions.
12 changes: 7 additions & 5 deletions core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests>
private final TableOperations ops;
private final Map<Integer, PartitionSpec> specsById;
private final long manifestTargetSizeBytes;
private final boolean snapshotIdInheritanceEnabled;
private final int formatVersion;
private final boolean canInheritSnapshotId;

private final Set<ManifestFile> deletedManifests = Sets.newHashSet();
private final List<ManifestFile> addedManifests = Lists.newArrayList();
Expand All @@ -82,10 +83,12 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests>
this.manifestTargetSizeBytes =
ops.current()
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
this.snapshotIdInheritanceEnabled =
this.formatVersion = ops.current().formatVersion();
boolean snapshotIdInheritanceEnabled =
ops.current()
.propertyAsBoolean(
SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
this.canInheritSnapshotId = formatVersion > 1 || snapshotIdInheritanceEnabled;
}

@Override
Expand Down Expand Up @@ -148,7 +151,7 @@ public RewriteManifests addManifest(ManifestFile manifest) {
Preconditions.checkArgument(
manifest.sequenceNumber() == -1, "Sequence must be assigned during commit");

if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) {
if (canInheritSnapshotId && manifest.snapshotId() == null) {
addedManifests.add(manifest);
} else {
// the manifest must be rewritten with this update's snapshot ID
Expand All @@ -160,11 +163,10 @@ public RewriteManifests addManifest(ManifestFile manifest) {
}

private ManifestFile copyManifest(ManifestFile manifest) {
TableMetadata current = ops.current();
InputFile toCopy = ops.io().newInputFile(manifest.path());
OutputFile newFile = newManifestOutput();
return ManifestFiles.copyRewriteManifest(
current.formatVersion(),
formatVersion,
manifest.partitionSpecId(),
toCopy,
specsById,
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -443,6 +444,14 @@ public void testBasicManifestReplacement() throws IOException {
List<ManifestFile> manifests = snapshot.allManifests(table.io());
Assert.assertEquals(3, manifests.size());

if (formatVersion == 1) {
assertThat(manifests.get(0).path()).isNotEqualTo(firstNewManifest.path());
assertThat(manifests.get(1).path()).isNotEqualTo(secondNewManifest.path());
} else {
assertThat(manifests.get(0).path()).isEqualTo(firstNewManifest.path());
assertThat(manifests.get(1).path()).isEqualTo(secondNewManifest.path());
}

validateSummary(snapshot, 1, 1, 2, 0);

validateManifestEntries(
Expand Down Expand Up @@ -499,6 +508,9 @@ public void testBasicManifestReplacementWithSnapshotIdInheritance() throws IOExc
List<ManifestFile> manifests = snapshot.allManifests(table.io());
Assert.assertEquals(3, manifests.size());

assertThat(manifests.get(0).path()).isEqualTo(firstNewManifest.path());
assertThat(manifests.get(1).path()).isEqualTo(secondNewManifest.path());

validateSummary(snapshot, 1, 1, 2, 0);

validateManifestEntries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ private void replaceManifests(
addedManifests.forEach(rewriteManifests::addManifest);
commit(rewriteManifests);

if (!snapshotIdInheritanceEnabled) {
if (formatVersion == 1 && !snapshotIdInheritanceEnabled) {
// delete new manifests as they were rewritten before the commit
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.iceberg.ValidationHelpers.snapshotIds;
import static org.apache.iceberg.ValidationHelpers.validateDataManifest;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -75,17 +76,24 @@ public class TestRewriteManifestsAction extends SparkTestBase {
optional(2, "c2", Types.StringType.get()),
optional(3, "c3", Types.StringType.get()));

@Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}")
@Parameterized.Parameters(name = "formatVersion = {0}, snapshotIdInheritanceEnabled = {1}")
public static Object[] parameters() {
return new Object[] {"true", "false"};
return new Object[][] {
new Object[] {1, "true"},
new Object[] {1, "false"},
new Object[] {2, "true"},
new Object[] {2, "false"}
};
}

@Rule public TemporaryFolder temp = new TemporaryFolder();

private final int formatVersion;
private final String snapshotIdInheritanceEnabled;
private String tableLocation = null;

public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled) {
public TestRewriteManifestsAction(int formatVersion, String snapshotIdInheritanceEnabled) {
this.formatVersion = formatVersion;
this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
}

Expand All @@ -99,6 +107,7 @@ public void setupTableLocation() throws Exception {
public void testRewriteManifestsEmptyTable() throws IOException {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand All @@ -119,6 +128,7 @@ public void testRewriteManifestsEmptyTable() throws IOException {
public void testRewriteSmallManifestsNonPartitionedTable() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -172,6 +182,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() {
public void testRewriteManifestsWithCommitStateUnknownException() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -238,6 +249,7 @@ public void testRewriteManifestsWithCommitStateUnknownException() {
public void testRewriteSmallManifestsPartitionedTable() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -318,6 +330,7 @@ public void testRewriteSmallManifestsPartitionedTable() {
public void testRewriteImportedManifests() throws IOException {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -373,6 +386,7 @@ public void testRewriteImportedManifests() throws IOException {
public void testRewriteLargeManifestsPartitionedTable() throws IOException {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -428,6 +442,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException {
public void testRewriteManifestsWithPredicate() throws IOException {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -495,6 +510,8 @@ public void testRewriteManifestsWithPredicate() throws IOException {

@Test
public void testRewriteSmallManifestsNonPartitionedV2Table() {
assumeThat(formatVersion).isGreaterThan(1);

PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
Table table = TABLES.create(SCHEMA, spec, properties, tableLocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ private void replaceManifests(
addedManifests.forEach(rewriteManifests::addManifest);
commit(rewriteManifests);

if (!snapshotIdInheritanceEnabled) {
if (formatVersion == 1 && !snapshotIdInheritanceEnabled) {
// delete new manifests as they were rewritten before the commit
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.iceberg.ValidationHelpers.snapshotIds;
import static org.apache.iceberg.ValidationHelpers.validateDataManifest;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -75,17 +76,24 @@ public class TestRewriteManifestsAction extends SparkTestBase {
optional(2, "c2", Types.StringType.get()),
optional(3, "c3", Types.StringType.get()));

@Parameterized.Parameters(name = "snapshotIdInheritanceEnabled = {0}")
@Parameterized.Parameters(name = "formatVersion = {0}, snapshotIdInheritanceEnabled = {1}")
public static Object[] parameters() {
return new Object[] {"true", "false"};
return new Object[][] {
new Object[] {1, "true"},
new Object[] {1, "false"},
new Object[] {2, "true"},
new Object[] {2, "false"}
};
}

@Rule public TemporaryFolder temp = new TemporaryFolder();

private final int formatVersion;
private final String snapshotIdInheritanceEnabled;
private String tableLocation = null;

public TestRewriteManifestsAction(String snapshotIdInheritanceEnabled) {
public TestRewriteManifestsAction(int formatVersion, String snapshotIdInheritanceEnabled) {
this.formatVersion = formatVersion;
this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
}

Expand All @@ -99,6 +107,7 @@ public void setupTableLocation() throws Exception {
public void testRewriteManifestsEmptyTable() throws IOException {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand All @@ -119,6 +128,7 @@ public void testRewriteManifestsEmptyTable() throws IOException {
public void testRewriteSmallManifestsNonPartitionedTable() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -172,6 +182,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() {
public void testRewriteManifestsWithCommitStateUnknownException() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -238,6 +249,7 @@ public void testRewriteManifestsWithCommitStateUnknownException() {
public void testRewriteSmallManifestsPartitionedTable() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -318,6 +330,7 @@ public void testRewriteSmallManifestsPartitionedTable() {
public void testRewriteImportedManifests() throws IOException {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -373,6 +386,7 @@ public void testRewriteImportedManifests() throws IOException {
public void testRewriteLargeManifestsPartitionedTable() throws IOException {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c3").build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -428,6 +442,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException {
public void testRewriteManifestsWithPredicate() throws IOException {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

Expand Down Expand Up @@ -495,6 +510,8 @@ public void testRewriteManifestsWithPredicate() throws IOException {

@Test
public void testRewriteSmallManifestsNonPartitionedV2Table() {
assumeThat(formatVersion).isGreaterThan(1);

PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
Table table = TABLES.create(SCHEMA, spec, properties, tableLocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ private void replaceManifests(
addedManifests.forEach(rewriteManifests::addManifest);
commit(rewriteManifests);

if (!snapshotIdInheritanceEnabled) {
if (formatVersion == 1 && !snapshotIdInheritanceEnabled) {
// delete new manifests as they were rewritten before the commit
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
}
Expand Down
Loading

0 comments on commit 820e6bb

Please sign in to comment.