Skip to content

Commit

Permalink
Support SnapshotProducer.updateEvent(Snapshot committedSnapshot)
Browse files Browse the repository at this point in the history
avoids unnecessary iceberg metadata read of just committed snapshot
  • Loading branch information
grantatspothero committed Dec 20, 2024
1 parent 6da8188 commit a710e1c
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,19 @@ public CherryPickOperation cherrypick(long snapshotId) {
}

@Override
public Object updateEvent() {
public Object updateEvent(Snapshot committedSnapshot) {
if (cherrypickSnapshot == null) {
// NOOP operation, no snapshot created
return null;
}

TableMetadata tableMetadata = refresh();
long snapshotId = tableMetadata.currentSnapshot().snapshotId();
long snapshotId = committedSnapshot.snapshotId();
if (cherrypickSnapshot.snapshotId() == snapshotId) {
// No new snapshot is created for fast-forward
return null;
} else {
// New snapshot created, we rely on super class to fire a CreateSnapshotEvent
return super.updateEvent();
return super.updateEvent(committedSnapshot);
}
}

Expand Down
13 changes: 9 additions & 4 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -157,12 +158,16 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
}

@Override
public Object updateEvent() {
public Object updateEvent(Snapshot committedSnapshot) {
long snapshotId = snapshotId();
Snapshot snapshot = ops().current().snapshot(snapshotId);
long sequenceNumber = snapshot.sequenceNumber();
ValidationException.check(
snapshotId == committedSnapshot.snapshotId(),
"Committed snapshotId %s does not match expected snapshotId %s",
committedSnapshot.snapshotId(),
snapshotId);
long sequenceNumber = committedSnapshot.sequenceNumber();
return new CreateSnapshotEvent(
tableName, operation(), snapshotId, sequenceNumber, snapshot.summary());
tableName, operation(), snapshotId, sequenceNumber, committedSnapshot.summary());
}

@Override
Expand Down
29 changes: 9 additions & 20 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,8 @@
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private static final Logger LOG = LoggerFactory.getLogger(MergingSnapshotProducer.class);

// data is only added in "append" and "overwrite" operations
private static final Set<String> VALIDATE_ADDED_FILES_OPERATIONS =
ImmutableSet.of(DataOperations.APPEND, DataOperations.OVERWRITE);
Expand Down Expand Up @@ -956,23 +952,16 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
}

@Override
public Object updateEvent() {
public Object updateEvent(Snapshot committedSnapshot) {
long snapshotId = snapshotId();
Snapshot justSaved = ops().refresh().snapshot(snapshotId);
long sequenceNumber = TableMetadata.INVALID_SEQUENCE_NUMBER;
Map<String, String> summary;
if (justSaved == null) {
// The snapshot just saved may not be present if the latest metadata couldn't be loaded due to
// eventual
// consistency problems in refresh.
LOG.warn("Failed to load committed snapshot: omitting sequence number from notifications");
summary = summary();
} else {
sequenceNumber = justSaved.sequenceNumber();
summary = justSaved.summary();
}

return new CreateSnapshotEvent(tableName, operation(), snapshotId, sequenceNumber, summary);
ValidationException.check(
snapshotId == committedSnapshot.snapshotId(),
"Committed snapshotId %s does not match expected snapshotId %s",
committedSnapshot.snapshotId(),
snapshotId);
long sequenceNumber = committedSnapshot.sequenceNumber();
return new CreateSnapshotEvent(
tableName, operation(), snapshotId, sequenceNumber, committedSnapshot.summary());
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,14 @@ public void commit() {
}
}

Object updateEvent(Snapshot committedSnapshot) {
return updateEvent();
}

private void notifyListeners(Snapshot committedSnapshot) {
try {
if (committedSnapshot != null) {
Object event = updateEvent();
Object event = updateEvent(committedSnapshot);
if (event != null) {
Listeners.notifyAll(event);

Expand Down

0 comments on commit a710e1c

Please sign in to comment.