Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spec: snapshot_id is optional for V1 #8704

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,6 @@ public RewriteManifests addManifest(ManifestFile manifest) {
Preconditions.checkArgument(!manifest.hasAddedFiles(), "Cannot add manifest with added files");
Preconditions.checkArgument(
!manifest.hasDeletedFiles(), "Cannot add manifest with deleted files");
Preconditions.checkArgument(
manifest.snapshotId() == null || manifest.snapshotId() == -1,
"Snapshot id must be assigned during commit");
Preconditions.checkArgument(
manifest.sequenceNumber() == -1, "Sequence must be assigned during commit");

Expand Down
3 changes: 0 additions & 3 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,6 @@ public FastAppend appendManifest(ManifestFile manifest) {
!manifest.hasExistingFiles(), "Cannot append manifest with existing files");
Preconditions.checkArgument(
!manifest.hasDeletedFiles(), "Cannot append manifest with deleted files");
Preconditions.checkArgument(
manifest.snapshotId() == null || manifest.snapshotId() == -1,
"Snapshot id must be assigned during commit");
Preconditions.checkArgument(
manifest.sequenceNumber() == -1, "Sequence number must be assigned during commit");

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ public static ManifestReader<DataFile> read(
* @param spec {@link PartitionSpec} used to produce {@link DataFile} partition tuples
* @param outputFile the destination file location
* @return a manifest writer
* @deprecated Use {@link ManifestFiles#write(int, PartitionSpec, OutputFile, Long)} instead since
* the SnapshotID cannot be null for a V1 manifest
*/
public static ManifestWriter<DataFile> write(PartitionSpec spec, OutputFile outputFile) {
return write(1, spec, outputFile, null);
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/org/apache/iceberg/ManifestWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public abstract class ManifestWriter<F extends ContentFile<F>> implements FileAp
private final OutputFile file;
private final int specId;
private final FileAppender<ManifestEntry<F>> writer;
private final Long snapshotId;
protected final Long snapshotId;
private final GenericManifestEntry<F> reused;
private final PartitionSummary stats;

Expand Down Expand Up @@ -291,11 +291,16 @@ static class V1Writer extends ManifestWriter<DataFile> {

V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
Preconditions.checkNotNull(snapshotId, "SnapshotId cannot be null");
this.entryWrapper = new V1Metadata.IndexedManifestEntry(spec.partitionType());
}

@Override
protected ManifestEntry<DataFile> prepare(ManifestEntry<DataFile> entry) {
if (entry.snapshotId() == null && this.snapshotId != null) {
entry.setSnapshotId(this.snapshotId);
}

return entryWrapper.wrap(entry);
}

Expand Down
3 changes: 0 additions & 3 deletions core/src/main/java/org/apache/iceberg/MergeAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ public AppendFiles appendManifest(ManifestFile manifest) {
!manifest.hasExistingFiles(), "Cannot append manifest with existing files");
Preconditions.checkArgument(
!manifest.hasDeletedFiles(), "Cannot append manifest with deleted files");
Preconditions.checkArgument(
manifest.snapshotId() == null || manifest.snapshotId() == -1,
"Snapshot id must be assigned during commit");
Preconditions.checkArgument(
manifest.sequenceNumber() == -1, "Sequence must be assigned during commit");
add(manifest);
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/V1Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ static Schema wrapFileSchema(Types.StructType fileSchema) {
// this is used to build projection schemas
return new Schema(
ManifestEntry.STATUS,
ManifestEntry.SNAPSHOT_ID,
ManifestEntry.SNAPSHOT_ID.asRequired(),
required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema));
}

Expand Down
46 changes: 27 additions & 19 deletions core/src/test/java/org/apache/iceberg/TableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,12 @@ public TableMetadata readMetadata() {
}

ManifestFile writeManifest(DataFile... files) throws IOException {
return writeManifest(null, files);
Long snapshotId = null;
if (table.ops().current().formatVersion() == 1) {
snapshotId = table.ops().newSnapshotId();
}

return writeManifest(snapshotId, files);
}

ManifestFile writeManifest(Long snapshotId, DataFile... files) throws IOException {
Expand All @@ -242,19 +247,22 @@ ManifestFile writeManifest(Long snapshotId, DataFile... files) throws IOExceptio

ManifestWriter<DataFile> writer =
ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId);
try {
try (ManifestWriter<DataFile> writerRef = writer) {
for (DataFile file : files) {
writer.add(file);
writerRef.add(file);
}
} finally {
writer.close();
}

return writer.toManifestFile();
}

ManifestFile writeManifest(String fileName, ManifestEntry<?>... entries) throws IOException {
return writeManifest(null, fileName, entries);
Long snapshotId = null;
if (table.ops().current().formatVersion() == 1) {
snapshotId = table.ops().newSnapshotId();
}

return writeManifest(snapshotId, fileName, entries);
}

ManifestFile writeManifest(Long snapshotId, ManifestEntry<?>... entries) throws IOException {
Expand All @@ -279,12 +287,10 @@ <F extends ContentFile<F>> ManifestFile writeManifest(
ManifestFiles.writeDeleteManifest(
formatVersion, table.spec(), outputFile, snapshotId);
}
try {
try (ManifestWriter<F> writerRef = writer) {
for (ManifestEntry<?> entry : entries) {
writer.addEntry((ManifestEntry<F>) entry);
writerRef.addEntry((ManifestEntry<F>) entry);
}
} finally {
writer.close();
}

return writer.toManifestFile();
Expand All @@ -297,13 +303,12 @@ ManifestFile writeDeleteManifest(int newFormatVersion, Long snapshotId, DeleteFi
FileFormat.AVRO.addExtension(temp.newFile().toString()));
ManifestWriter<DeleteFile> writer =
ManifestFiles.writeDeleteManifest(newFormatVersion, SPEC, manifestFile, snapshotId);
try {
try (ManifestWriter<DeleteFile> writerRef = writer) {
for (DeleteFile deleteFile : deleteFiles) {
writer.add(deleteFile);
writerRef.add(deleteFile);
}
} finally {
writer.close();
}

return writer.toManifestFile();
}

Expand All @@ -312,14 +317,17 @@ ManifestFile writeManifestWithName(String name, DataFile... files) throws IOExce
Assert.assertTrue(manifestFile.delete());
OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());

Long snapshotId = null;
if (formatVersion == 1) {
snapshotId = table.ops().newSnapshotId();
}

ManifestWriter<DataFile> writer =
ManifestFiles.write(formatVersion, table.spec(), outputFile, null);
try {
ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId);
try (ManifestWriter<DataFile> writerRef = writer) {
for (DataFile file : files) {
writer.add(file);
writerRef.add(file);
}
} finally {
writer.close();
}

return writer.toManifestFile();
Expand Down
8 changes: 6 additions & 2 deletions core/src/test/java/org/apache/iceberg/TestFastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -387,14 +387,18 @@ public void testInvalidAppendManifest() throws IOException {
Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());

ManifestFile manifestWithExistingFiles =
writeManifest("manifest-file-1.avro", manifestEntry(Status.EXISTING, null, FILE_A));
writeManifest(
"manifest-file-1.avro",
manifestEntry(Status.EXISTING, table.ops().newSnapshotId(), FILE_A));
Assertions.assertThatThrownBy(
() -> table.newFastAppend().appendManifest(manifestWithExistingFiles).commit())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot append manifest with existing files");

ManifestFile manifestWithDeletedFiles =
writeManifest("manifest-file-2.avro", manifestEntry(Status.DELETED, null, FILE_A));
writeManifest(
"manifest-file-2.avro",
manifestEntry(Status.DELETED, table.ops().newSnapshotId(), FILE_A));
Assertions.assertThatThrownBy(
() -> table.newFastAppend().appendManifest(manifestWithDeletedFiles).commit())
.isInstanceOf(IllegalArgumentException.class)
Expand Down
9 changes: 0 additions & 9 deletions core/src/test/java/org/apache/iceberg/TestManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
Expand Down Expand Up @@ -74,14 +73,6 @@ public void testReaderWithFilterWithoutSelect() throws IOException {
}
}

@Test
public void testInvalidUsage() throws IOException {
ManifestFile manifest = writeManifest(FILE_A, FILE_B);
Assertions.assertThatThrownBy(() -> ManifestFiles.read(manifest, FILE_IO))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot read from ManifestFile with null (unassigned) snapshot ID");
}

@Test
public void testManifestReaderWithPartitionMetadata() throws IOException {
ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, FILE_A));
Expand Down
76 changes: 44 additions & 32 deletions core/src/test/java/org/apache/iceberg/TestManifestWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,29 @@ public TestManifestWriter(int formatVersion) {
private static final int FILE_SIZE_CHECK_ROWS_DIVISOR = 250;
private static final long SMALL_FILE_SIZE = 10L;

private Long getSnapshotId() {
if (table.ops().current().formatVersion() == 1) {
return table.ops().newSnapshotId();
}
return null;
}

@Test
public void testManifestStats() throws IOException {
Long snapshotId = getSnapshotId();

ManifestFile manifest =
writeManifest(
"manifest.avro",
manifestEntry(Status.ADDED, null, newFile(10)),
manifestEntry(Status.ADDED, null, newFile(20)),
manifestEntry(Status.ADDED, null, newFile(5)),
manifestEntry(Status.ADDED, null, newFile(5)),
manifestEntry(Status.EXISTING, null, newFile(15)),
manifestEntry(Status.EXISTING, null, newFile(10)),
manifestEntry(Status.EXISTING, null, newFile(1)),
manifestEntry(Status.DELETED, null, newFile(5)),
manifestEntry(Status.DELETED, null, newFile(2)));
manifestEntry(Status.ADDED, snapshotId, newFile(10)),
manifestEntry(Status.ADDED, snapshotId, newFile(20)),
manifestEntry(Status.ADDED, snapshotId, newFile(5)),
manifestEntry(Status.ADDED, snapshotId, newFile(5)),
manifestEntry(Status.EXISTING, snapshotId, newFile(15)),
manifestEntry(Status.EXISTING, snapshotId, newFile(10)),
manifestEntry(Status.EXISTING, snapshotId, newFile(1)),
manifestEntry(Status.DELETED, snapshotId, newFile(5)),
manifestEntry(Status.DELETED, snapshotId, newFile(2)));

Assert.assertTrue("Added files should be present", manifest.hasAddedFiles());
Assert.assertEquals("Added files count should match", 4, (int) manifest.addedFilesCount());
Expand All @@ -81,12 +90,14 @@ public void testManifestStats() throws IOException {

@Test
public void testManifestPartitionStats() throws IOException {
Long snapshotId = getSnapshotId();

ManifestFile manifest =
writeManifest(
"manifest.avro",
manifestEntry(Status.ADDED, null, newFile(10, TestHelpers.Row.of(1))),
manifestEntry(Status.EXISTING, null, newFile(15, TestHelpers.Row.of(2))),
manifestEntry(Status.DELETED, null, newFile(2, TestHelpers.Row.of(3))));
manifestEntry(Status.ADDED, snapshotId, newFile(10, TestHelpers.Row.of(1))),
manifestEntry(Status.EXISTING, snapshotId, newFile(15, TestHelpers.Row.of(2))),
manifestEntry(Status.DELETED, snapshotId, newFile(2, TestHelpers.Row.of(3))));

List<ManifestFile.PartitionFieldSummary> partitions = manifest.partitions();
Assert.assertEquals("Partition field summaries count should match", 1, partitions.size());
Expand Down Expand Up @@ -249,34 +260,35 @@ public void testRollingDeleteManifestWriterNoRecords() throws IOException {

@Test
public void testRollingManifestWriterSplitFiles() throws IOException {
RollingManifestWriter<DataFile> writer = newRollingWriteManifest(SMALL_FILE_SIZE);

int[] addedFileCounts = new int[3];
int[] existingFileCounts = new int[3];
int[] deletedFileCounts = new int[3];
long[] addedRowCounts = new long[3];
long[] existingRowCounts = new long[3];
long[] deletedRowCounts = new long[3];

for (int i = 0; i < FILE_SIZE_CHECK_ROWS_DIVISOR * 3; i++) {
int type = i % 3;
int fileIndex = i / FILE_SIZE_CHECK_ROWS_DIVISOR;
if (type == 0) {
writer.add(newFile(i));
addedFileCounts[fileIndex] += 1;
addedRowCounts[fileIndex] += i;
} else if (type == 1) {
writer.existing(newFile(i), 1, 1, null);
existingFileCounts[fileIndex] += 1;
existingRowCounts[fileIndex] += i;
} else {
writer.delete(newFile(i), 1, null);
deletedFileCounts[fileIndex] += 1;
deletedRowCounts[fileIndex] += i;
RollingManifestWriter<DataFile> writer = newRollingWriteManifest(SMALL_FILE_SIZE);

try (RollingManifestWriter<DataFile> writerRef = writer) {
for (int i = 0; i < FILE_SIZE_CHECK_ROWS_DIVISOR * 3; i++) {
int type = i % 3;
int fileIndex = i / FILE_SIZE_CHECK_ROWS_DIVISOR;
if (type == 0) {
DataFile f = newFile(i);
writerRef.add(f);
addedFileCounts[fileIndex] += 1;
addedRowCounts[fileIndex] += i;
} else if (type == 1) {
writerRef.existing(newFile(i), 1, 1, null);
existingFileCounts[fileIndex] += 1;
existingRowCounts[fileIndex] += i;
} else {
writerRef.delete(newFile(i), 1, null);
deletedFileCounts[fileIndex] += 1;
deletedRowCounts[fileIndex] += i;
}
}
}

writer.close();
List<ManifestFile> manifestFiles = writer.toManifestFiles();
Assertions.assertThat(manifestFiles.size()).isEqualTo(3);

Expand Down Expand Up @@ -414,7 +426,7 @@ private RollingManifestWriter<DataFile> newRollingWriteManifest(long targetFileS
return new RollingManifestWriter<>(
() -> {
OutputFile newManifestFile = newManifestFile();
return ManifestFiles.write(formatVersion, SPEC, newManifestFile, null);
return ManifestFiles.write(formatVersion, SPEC, newManifestFile, getSnapshotId());
},
targetFileSize);
}
Expand Down
Loading