Skip to content

Commit

Permalink
Core, Spark: Avoid manifest copies when importing data to V2 tables
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Nov 1, 2023
1 parent da392f2 commit 9b2b93a
Show file tree
Hide file tree
Showing 17 changed files with 766 additions and 681 deletions.
19 changes: 10 additions & 9 deletions api/src/main/java/org/apache/iceberg/AppendFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,17 @@ public interface AppendFiles extends SnapshotUpdate<AppendFiles> {
* <p>The manifest must contain only appended files. All files in the manifest will be appended to
* the table in the snapshot created by this update.
*
* <p>By default, the manifest will be rewritten to assign all entries this update's snapshot ID.
* In that case, it is always the responsibility of the caller to manage the lifecycle of the
* original manifest.
* <p>The manifest will be used directly if snapshot ID inheritance is enabled (all tables with
* the format version &gt; 1 or if the inheritance is enabled explicitly via table properties).
* Otherwise, the manifest will be rewritten to assign all entries this update's snapshot ID.
*
* <p>If manifest entries are allowed to inherit the snapshot ID assigned on commit, the manifest
* should never be deleted manually if the commit succeeds as it will become part of the table
* metadata and will be cleaned up on expiry. If the manifest gets merged with others while
* preparing a new snapshot, it will be deleted automatically if this operation is successful. If
* the commit fails, the manifest will never be deleted and it is up to the caller whether to
* delete or reuse it.
* <p>If the manifest is rewritten, it is always the responsibility of the caller to manage the
* lifecycle of the original manifest. If manifest entries are allowed to inherit the snapshot ID
* assigned on commit, the manifest should never be deleted manually if the commit succeeds as it
* will become part of the table metadata and will be cleaned upon expiry. If the manifest gets
* merged with others while preparing a new snapshot, it will be deleted automatically if this
* operation is successful. If the commit fails, the manifest will never be deleted, and it is up
* to the caller whether to delete or reuse it.
*
* @param file a manifest file
* @return this for method chaining
Expand Down
10 changes: 1 addition & 9 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT;

import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand All @@ -44,7 +41,6 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final String tableName;
private final TableOperations ops;
private final PartitionSpec spec;
private final boolean snapshotIdInheritanceEnabled;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final List<DataFile> newFiles = Lists.newArrayList();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
Expand All @@ -57,10 +53,6 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
this.tableName = tableName;
this.ops = ops;
this.spec = ops.current().spec();
this.snapshotIdInheritanceEnabled =
ops.current()
.propertyAsBoolean(
SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
}

@Override
Expand Down Expand Up @@ -115,7 +107,7 @@ public FastAppend appendManifest(ManifestFile manifest) {
Preconditions.checkArgument(
manifest.sequenceNumber() == -1, "Sequence number must be assigned during commit");

if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) {
if (canInheritSnapshotId() && manifest.snapshotId() == null) {
summaryBuilder.addedManifest(manifest);
appendManifests.add(manifest);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT;
import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -80,7 +78,6 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private final ManifestFilterManager<DataFile> filterManager;
private final ManifestMergeManager<DeleteFile> deleteMergeManager;
private final ManifestFilterManager<DeleteFile> deleteFilterManager;
private final boolean snapshotIdInheritanceEnabled;

// update data
private final List<DataFile> newDataFiles = Lists.newArrayList();
Expand Down Expand Up @@ -123,10 +120,6 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
this.deleteMergeManager =
new DeleteFileMergeManager(targetSizeBytes, minCountToMerge, mergeEnabled);
this.deleteFilterManager = new DeleteFileFilterManager();
this.snapshotIdInheritanceEnabled =
ops.current()
.propertyAsBoolean(
SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
}

@Override
Expand Down Expand Up @@ -271,7 +264,7 @@ private void setDataSpec(DataFile file) {
protected void add(ManifestFile manifest) {
Preconditions.checkArgument(
manifest.content() == ManifestContent.DATA, "Cannot append delete manifest: %s", manifest);
if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) {
if (canInheritSnapshotId() && manifest.snapshotId() == null) {
appendedManifestsSummary.addedManifest(manifest);
appendManifests.add(manifest);
} else {
Expand Down
28 changes: 26 additions & 2 deletions core/src/test/java/org/apache/iceberg/TestFastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.stream.Collectors;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -83,6 +84,13 @@ public void testEmptyTableAppendManifest() throws IOException {

validateSnapshot(base.currentSnapshot(), snap, 1, FILE_A, FILE_B);

ManifestFile committedManifest = Iterables.getOnlyElement(snap.allManifests(FILE_IO));
if (formatVersion == 1) {
Assertions.assertThat(committedManifest.path()).isNotEqualTo(manifest.path());
} else {
Assertions.assertThat(committedManifest.path()).isEqualTo(manifest.path());
}

// validate that the metadata summary is correct when using appendManifest
Assert.assertEquals(
"Summary metadata should include 2 added files",
Expand Down Expand Up @@ -126,6 +134,12 @@ public void testEmptyTableAppendFilesAndManifest() throws IOException {
ids(commitId, commitId),
files(FILE_A, FILE_B));

if (formatVersion == 1) {
Assertions.assertThat(snap.allManifests(FILE_IO).get(1).path()).isNotEqualTo(manifest.path());
} else {
Assertions.assertThat(snap.allManifests(FILE_IO).get(1).path()).isEqualTo(manifest.path());
}

V2Assert.assertEquals("Snapshot sequence number should be 1", 1, snap.sequenceNumber());
V2Assert.assertEquals(
"Last sequence number should be 1", 1, readMetadata().lastSequenceNumber());
Expand Down Expand Up @@ -257,12 +271,21 @@ public void testAppendManifestCleanup() throws IOException {
Snapshot pending = append.apply();
ManifestFile newManifest = pending.allManifests(FILE_IO).get(0);
Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
if (formatVersion == 1) {
Assertions.assertThat(newManifest.path()).isNotEqualTo(manifest.path());
} else {
Assertions.assertThat(newManifest.path()).isEqualTo(manifest.path());
}

Assertions.assertThatThrownBy(append::commit)
.isInstanceOf(CommitFailedException.class)
.hasMessage("Injected failure");

Assert.assertFalse("Should clean up new manifest", new File(newManifest.path()).exists());
if (formatVersion == 1) {
Assertions.assertThat(new File(newManifest.path())).doesNotExist();
} else {
Assertions.assertThat(new File(newManifest.path())).exists();
}
}

@Test
Expand Down Expand Up @@ -327,7 +350,8 @@ public void testAppendManifestWithSnapshotIdInheritance() throws IOException {

Snapshot snapshot = table.currentSnapshot();
List<ManifestFile> manifests = table.currentSnapshot().allManifests(FILE_IO);
Assert.assertEquals("Should have 1 committed manifest", 1, manifests.size());
ManifestFile committedManifest = Iterables.getOnlyElement(manifests);
Assertions.assertThat(committedManifest.path()).isEqualTo(manifest.path());

validateManifestEntries(
manifests.get(0),
Expand Down
80 changes: 61 additions & 19 deletions core/src/test/java/org/apache/iceberg/TestMergeAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.stream.Collectors;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -107,14 +108,17 @@ public void testEmptyTableAppendManifest() throws IOException {
"Last sequence number should be 0", 0, table.ops().current().lastSequenceNumber());
V2Assert.assertEquals(
"Last sequence number should be 1", 1, table.ops().current().lastSequenceNumber());
Assert.assertEquals(
"Should create 1 manifest for initial write",
1,
committedSnapshot.allManifests(table.io()).size());
List<ManifestFile> manifests = committedSnapshot.allManifests(table.io());
ManifestFile committedManifest = Iterables.getOnlyElement(manifests);
if (formatVersion == 1) {
Assertions.assertThat(committedManifest.path()).isNotEqualTo(manifest.path());
} else {
Assertions.assertThat(committedManifest.path()).isEqualTo(manifest.path());
}

long snapshotId = committedSnapshot.snapshotId();
validateManifest(
committedSnapshot.allManifests(table.io()).get(0),
committedManifest,
dataSeqs(1L, 1L),
fileSeqs(1L, 1L),
ids(snapshotId, snapshotId),
Expand Down Expand Up @@ -155,16 +159,25 @@ public void testEmptyTableAppendFilesAndManifest() throws IOException {

long snapshotId = committedSnapshot.snapshotId();

ManifestFile committedManifest1 = committedSnapshot.allManifests(table.io()).get(0);
ManifestFile committedManifest2 = committedSnapshot.allManifests(table.io()).get(1);

if (formatVersion == 1) {
Assertions.assertThat(committedManifest2.path()).isNotEqualTo(manifest.path());
} else {
Assertions.assertThat(committedManifest2.path()).isEqualTo(manifest.path());
}

validateManifest(
committedSnapshot.allManifests(table.io()).get(0),
committedManifest1,
dataSeqs(1L, 1L),
fileSeqs(1L, 1L),
ids(snapshotId, snapshotId),
files(FILE_C, FILE_D),
statuses(Status.ADDED, Status.ADDED));

validateManifest(
committedSnapshot.allManifests(table.io()).get(1),
committedManifest2,
dataSeqs(1L, 1L),
fileSeqs(1L, 1L),
ids(snapshotId, snapshotId),
Expand Down Expand Up @@ -229,11 +242,13 @@ public void testMergeWithAppendFilesAndManifest() throws IOException {

long snapshotId = committedSnapshot.snapshotId();

Assert.assertEquals(
"Should create 1 merged manifest", 1, committedSnapshot.allManifests(table.io()).size());
List<ManifestFile> manifests = committedSnapshot.allManifests(table.io());
ManifestFile committedManifest = Iterables.getOnlyElement(manifests);

Assertions.assertThat(committedManifest.path()).isNotEqualTo(manifest.path());

validateManifest(
committedSnapshot.allManifests(table.io()).get(0),
committedManifest,
dataSeqs(1L, 1L, 1L, 1L),
fileSeqs(1L, 1L, 1L, 1L),
ids(snapshotId, snapshotId, snapshotId, snapshotId),
Expand Down Expand Up @@ -316,7 +331,7 @@ public void testManifestMergeMinCount() throws IOException {
Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());
Assert.assertEquals("Last sequence number should be 0", 0, base.lastSequenceNumber());

ManifestFile manifest = writeManifest(FILE_A);
ManifestFile manifest = writeManifestWithName("FILE_A", FILE_A);
ManifestFile manifest2 = writeManifestWithName("FILE_C", FILE_C);
ManifestFile manifest3 = writeManifestWithName("FILE_D", FILE_D);
Snapshot snap1 =
Expand Down Expand Up @@ -355,6 +370,11 @@ public void testManifestMergeMinCount() throws IOException {
files(FILE_C, FILE_D),
statuses(Status.ADDED, Status.ADDED));

// produce new manifests as the old ones could have been compacted
manifest = writeManifestWithName("FILE_A_S2", FILE_A);
manifest2 = writeManifestWithName("FILE_C_S2", FILE_C);
manifest3 = writeManifestWithName("FILE_D_S2", FILE_D);

Snapshot snap2 =
commit(
table,
Expand Down Expand Up @@ -546,29 +566,41 @@ public void testManifestDoNotMergeMinCount() throws IOException {
V2Assert.assertEquals(
"Last sequence number should be 1", 1, table.ops().current().lastSequenceNumber());

Assert.assertEquals(
"Should contain 3 merged manifest after 1st write write",
3,
committed.allManifests(table.io()).size());
List<ManifestFile> manifests = committed.allManifests(table.io());
Assertions.assertThat(manifests).hasSize(3);

ManifestFile committedManifest = manifests.get(0);
ManifestFile committedManifest2 = manifests.get(1);
ManifestFile committedManifest3 = manifests.get(2);

long snapshotId = committed.snapshotId();

if (formatVersion == 1) {
Assertions.assertThat(committedManifest.path()).isNotEqualTo(manifest.path());
Assertions.assertThat(committedManifest2.path()).isNotEqualTo(manifest2.path());
Assertions.assertThat(committedManifest3.path()).isNotEqualTo(manifest3.path());
} else {
Assertions.assertThat(committedManifest.path()).isEqualTo(manifest.path());
Assertions.assertThat(committedManifest2.path()).isEqualTo(manifest2.path());
Assertions.assertThat(committedManifest3.path()).isEqualTo(manifest3.path());
}

validateManifest(
committed.allManifests(table.io()).get(0),
committedManifest,
dataSeqs(1L, 1L),
fileSeqs(1L, 1L),
ids(snapshotId, snapshotId),
files(FILE_A, FILE_B),
statuses(Status.ADDED, Status.ADDED));
validateManifest(
committed.allManifests(table.io()).get(1),
committedManifest2,
dataSeqs(1L),
fileSeqs(1L),
ids(snapshotId),
files(FILE_C),
statuses(Status.ADDED));
validateManifest(
committed.allManifests(table.io()).get(2),
committedManifest3,
dataSeqs(1L),
fileSeqs(1L),
ids(snapshotId),
Expand Down Expand Up @@ -981,6 +1013,11 @@ public void testAppendManifestCleanup() throws IOException {
Snapshot pending = apply(append, branch);
ManifestFile newManifest = pending.allManifests(table.io()).get(0);
Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
if (formatVersion == 1) {
Assertions.assertThat(newManifest.path()).isNotEqualTo(manifest.path());
} else {
Assertions.assertThat(newManifest.path()).isEqualTo(manifest.path());
}

Assertions.assertThatThrownBy(() -> commit(table, append, branch))
.isInstanceOf(CommitFailedException.class)
Expand All @@ -990,7 +1027,11 @@ public void testAppendManifestCleanup() throws IOException {
V1Assert.assertEquals(
"Table should end with last-sequence-number 0", 0, readMetadata().lastSequenceNumber());

Assert.assertFalse("Should clean up new manifest", new File(newManifest.path()).exists());
if (formatVersion == 1) {
Assertions.assertThat(new File(newManifest.path())).doesNotExist();
} else {
Assertions.assertThat(new File(newManifest.path())).exists();
}
}

@Test
Expand Down Expand Up @@ -1084,6 +1125,7 @@ public void testAppendManifestWithSnapshotIdInheritance() throws IOException {
List<ManifestFile> manifests = snapshot.allManifests(table.io());
Assert.assertEquals("Should have 1 committed manifest", 1, manifests.size());
ManifestFile manifestFile = snapshot.allManifests(table.io()).get(0);
Assertions.assertThat(manifestFile.path()).isEqualTo(manifest.path());
validateManifest(
manifestFile,
dataSeqs(1L, 1L),
Expand Down
7 changes: 6 additions & 1 deletion core/src/test/java/org/apache/iceberg/TestTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -504,7 +505,11 @@ public void testTransactionRetryMergeCleanup() {
}

@Test
public void testTransactionRetryAndAppendManifests() throws Exception {
public void testTransactionRetryAndAppendManifestsWithoutSnapshotIdInheritance()
throws Exception {
// this test assumes append manifests are rewritten, which only happens in V1 tables
Assumptions.assumeThat(formatVersion).isEqualTo(1);

// use only one retry and aggressively merge manifests
table
.updateProperties()
Expand Down
4 changes: 2 additions & 2 deletions docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -649,10 +649,10 @@ Warning : Files added by this method can be physically deleted by Iceberg operat
| Output Name | Type | Description |
|---------------------------|------|---------------------------------------------------|
| `added_files_count` | long | The number of files added by this command |
| `changed_partition_count` | long | The number of partitioned changed by this command |
| `changed_partition_count` | long | The number of partitioned changed by this command (if known) |

{{< hint warning >}}
changed_partition_count will be 0 when table property `compatibility.snapshot-id-inheritance.enabled` is set to true or if the table format version is > 1.
changed_partition_count will be NULL when table property `compatibility.snapshot-id-inheritance.enabled` is set to true or if the table format version is > 1.
{{< /hint >}}
#### Examples

Expand Down
Loading

0 comments on commit 9b2b93a

Please sign in to comment.