Skip to content

Commit

Permalink
Merge branch 'main' into issue-10668
Browse files Browse the repository at this point in the history
  • Loading branch information
sl255051 committed Aug 1, 2024
2 parents 339df71 + 6e7113a commit 0dc50cf
Show file tree
Hide file tree
Showing 156 changed files with 3,464 additions and 465 deletions.
10 changes: 9 additions & 1 deletion .github/ISSUE_TEMPLATE/iceberg_improvement.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,12 @@ body:
- Hive
- Other
validations:
required: false
required: false
- type: checkboxes
attributes:
label: Willingness to contribute
description: The Apache Iceberg community encourages contributions. Would you or another member of your organization be willing to contribute this improvement/feature to the Apache Iceberg codebase?
options:
- label: I can contribute this improvement/feature independently
- label: I would be willing to contribute this improvement/feature with guidance from the Iceberg community
- label: I cannot contribute this improvement/feature at this time
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ project(':iceberg-core') {
annotationProcessor libs.immutables.value
compileOnly libs.immutables.value

implementation(libs.avro.avro) {
api(libs.avro.avro) {
exclude group: 'org.tukaani' // xz compression is not supported
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private int assignFieldId() {
*/
private PartitionField recycleOrCreatePartitionField(
Pair<Integer, Transform<?, ?>> sourceTransform, String name) {
if (formatVersion == 2 && base != null) {
if (formatVersion >= 2 && base != null) {
int sourceId = sourceTransform.first();
Transform<?, ?> transform = sourceTransform.second();

Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,16 @@ protected void cleanUncommitted(Set<ManifestFile> committed) {
}
}

/**
* Cleanup after committing is disabled for FastAppend unless there are rewrittenAppendManifests
* because: 1.) Appended manifests are never rewritten 2.) Manifests which are written out as part
* of appendFile are already cleaned up between commit attempts in writeNewManifests
*/
@Override
protected boolean cleanupAfterCommit() {
return !rewrittenAppendManifests.isEmpty();
}

private List<ManifestFile> writeNewManifests() throws IOException {
if (hasNewFiles && newManifests != null) {
newManifests.forEach(file -> deleteFile(file.path()));
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ public static ManifestWriter<DataFile> write(
return new ManifestWriter.V1Writer(spec, encryptedOutputFile, snapshotId);
case 2:
return new ManifestWriter.V2Writer(spec, encryptedOutputFile, snapshotId);
case 3:
return new ManifestWriter.V3Writer(spec, encryptedOutputFile, snapshotId);
}
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
Expand Down Expand Up @@ -238,6 +240,8 @@ public static ManifestWriter<DeleteFile> writeDeleteManifest(
throw new IllegalArgumentException("Cannot write delete files in a v1 table");
case 2:
return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId);
case 3:
return new ManifestWriter.V3DeleteWriter(spec, outputFile, snapshotId);
}
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
Expand Down
35 changes: 35 additions & 0 deletions core/src/main/java/org/apache/iceberg/ManifestListWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,41 @@ public long length() {
return writer.length();
}

static class V3Writer extends ManifestListWriter {
private final V3Metadata.IndexedManifestFile wrapper;

V3Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) {
super(
snapshotFile,
ImmutableMap.of(
"snapshot-id", String.valueOf(snapshotId),
"parent-snapshot-id", String.valueOf(parentSnapshotId),
"sequence-number", String.valueOf(sequenceNumber),
"format-version", "3"));
this.wrapper = new V3Metadata.IndexedManifestFile(snapshotId, sequenceNumber);
}

@Override
protected ManifestFile prepare(ManifestFile manifest) {
return wrapper.wrap(manifest);
}

@Override
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
try {
return Avro.write(file)
.schema(V3Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
.overwrite()
.build();

} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: %s", file);
}
}
}

static class V2Writer extends ManifestListWriter {
private final V2Metadata.IndexedManifestFile wrapper;

Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/ManifestLists.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ static ManifestListWriter write(
case 2:
return new ManifestListWriter.V2Writer(
manifestListFile, snapshotId, parentSnapshotId, sequenceNumber);
case 3:
return new ManifestListWriter.V3Writer(
manifestListFile, snapshotId, parentSnapshotId, sequenceNumber);
}
throw new UnsupportedOperationException(
"Cannot write manifest list for table version: " + formatVersion);
Expand Down
73 changes: 73 additions & 0 deletions core/src/main/java/org/apache/iceberg/ManifestWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,79 @@ public void close() throws IOException {
writer.close();
}

static class V3Writer extends ManifestWriter<DataFile> {
private final V3Metadata.IndexedManifestEntry<DataFile> entryWrapper;

V3Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
this.entryWrapper = new V3Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType());
}

@Override
protected ManifestEntry<DataFile> prepare(ManifestEntry<DataFile> entry) {
return entryWrapper.wrap(entry);
}

@Override
protected FileAppender<ManifestEntry<DataFile>> newAppender(
PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V3Metadata.entrySchema(spec.partitionType());
try {
return Avro.write(file)
.schema(manifestSchema)
.named("manifest_entry")
.meta("schema", SchemaParser.toJson(spec.schema()))
.meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
.meta("partition-spec-id", String.valueOf(spec.specId()))
.meta("format-version", "3")
.meta("content", "data")
.overwrite()
.build();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file);
}
}
}

static class V3DeleteWriter extends ManifestWriter<DeleteFile> {
private final V3Metadata.IndexedManifestEntry<DeleteFile> entryWrapper;

V3DeleteWriter(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
this.entryWrapper = new V3Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType());
}

@Override
protected ManifestEntry<DeleteFile> prepare(ManifestEntry<DeleteFile> entry) {
return entryWrapper.wrap(entry);
}

@Override
protected FileAppender<ManifestEntry<DeleteFile>> newAppender(
PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V3Metadata.entrySchema(spec.partitionType());
try {
return Avro.write(file)
.schema(manifestSchema)
.named("manifest_entry")
.meta("schema", SchemaParser.toJson(spec.schema()))
.meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
.meta("partition-spec-id", String.valueOf(spec.specId()))
.meta("format-version", "3")
.meta("content", "deletes")
.overwrite()
.build();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file);
}
}

@Override
protected ManifestContent content() {
return ManifestContent.DELETES;
}
}

static class V2Writer extends ManifestWriter<DataFile> {
private final V2Metadata.IndexedManifestEntry<DataFile> entryWrapper;

Expand Down
43 changes: 22 additions & 21 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptingFileIO;
Expand Down Expand Up @@ -368,8 +368,8 @@ protected TableMetadata refresh() {
@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public void commit() {
// this is always set to the latest commit attempt's snapshot id.
AtomicLong newSnapshotId = new AtomicLong(-1L);
// this is always set to the latest commit attempt's snapshot
AtomicReference<Snapshot> stagedSnapshot = new AtomicReference<>();
try (Timed ignore = commitMetrics().totalDuration().start()) {
try {
Tasks.foreach(ops)
Expand All @@ -384,7 +384,7 @@ public void commit() {
.run(
taskOps -> {
Snapshot newSnapshot = apply();
newSnapshotId.set(newSnapshot.snapshotId());
stagedSnapshot.set(newSnapshot);
TableMetadata.Builder update = TableMetadata.buildFrom(base);
if (base.snapshot(newSnapshot.snapshotId()) != null) {
// this is a rollback operation
Expand Down Expand Up @@ -422,26 +422,23 @@ public void commit() {
throw e;
}

// at this point, the commit must have succeeded so the stagedSnapshot is committed
Snapshot committedSnapshot = stagedSnapshot.get();
try {
LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), getClass().getSimpleName());

// at this point, the commit must have succeeded. after a refresh, the snapshot is loaded by
// id in case another commit was added between this commit and the refresh.
Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
if (saved != null) {
cleanUncommitted(Sets.newHashSet(saved.allManifests(ops.io())));
// also clean up unused manifest lists created by multiple attempts
for (String manifestList : manifestLists) {
if (!saved.manifestListLocation().equals(manifestList)) {
deleteFile(manifestList);
}
LOG.info(
"Committed snapshot {} ({})",
committedSnapshot.snapshotId(),
getClass().getSimpleName());

if (cleanupAfterCommit()) {
cleanUncommitted(Sets.newHashSet(committedSnapshot.allManifests(ops.io())));
}
// also clean up unused manifest lists created by multiple attempts
for (String manifestList : manifestLists) {
if (!committedSnapshot.manifestListLocation().equals(manifestList)) {
deleteFile(manifestList);
}
} else {
// saved may not be present if the latest metadata couldn't be loaded due to eventual
// consistency problems in refresh. in that case, don't clean up.
LOG.warn("Failed to load committed snapshot, skipping manifest clean-up");
}

} catch (Throwable e) {
LOG.warn(
"Failed to load committed table metadata or during cleanup, skipping further cleanup",
Expand Down Expand Up @@ -565,6 +562,10 @@ protected boolean canInheritSnapshotId() {
return canInheritSnapshotId;
}

protected boolean cleanupAfterCommit() {
return true;
}

private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) {
try (ManifestReader<DataFile> reader =
ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class TableMetadata implements Serializable {
static final long INITIAL_SEQUENCE_NUMBER = 0;
static final long INVALID_SEQUENCE_NUMBER = -1;
static final int DEFAULT_TABLE_FORMAT_VERSION = 2;
static final int SUPPORTED_TABLE_FORMAT_VERSION = 2;
static final int SUPPORTED_TABLE_FORMAT_VERSION = 3;
static final int INITIAL_SPEC_ID = 0;
static final int INITIAL_SORT_ORDER_ID = 1;
static final int INITIAL_SCHEMA_ID = 0;
Expand Down
Loading

0 comments on commit 0dc50cf

Please sign in to comment.