diff --git a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java index 3786b1185be6..bd960791b0e1 100644 --- a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java +++ b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java @@ -134,20 +134,19 @@ public CherryPickOperation cherrypick(long snapshotId) { } @Override - public Object updateEvent() { + public Object updateEvent(Snapshot committedSnapshot) { if (cherrypickSnapshot == null) { // NOOP operation, no snapshot created return null; } - TableMetadata tableMetadata = refresh(); - long snapshotId = tableMetadata.currentSnapshot().snapshotId(); + long snapshotId = committedSnapshot.snapshotId(); if (cherrypickSnapshot.snapshotId() == snapshotId) { // No new snapshot is created for fast-forward return null; } else { // New snapshot created, we rely on super class to fire a CreateSnapshotEvent - return super.updateEvent(); + return super.updateEvent(committedSnapshot); } } diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 51c0d5926fdb..ab7a6e8370a5 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -25,6 +25,7 @@ import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.events.CreateSnapshotEvent; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -157,12 +158,16 @@ public List apply(TableMetadata base, Snapshot snapshot) { } @Override - public Object updateEvent() { + public Object updateEvent(Snapshot committedSnapshot) { long snapshotId = snapshotId(); - Snapshot snapshot = ops().current().snapshot(snapshotId); - long sequenceNumber = snapshot.sequenceNumber(); + ValidationException.check( + snapshotId == committedSnapshot.snapshotId(), + "Committed snapshotId %s does not match expected snapshotId %s", + committedSnapshot.snapshotId(), + snapshotId); + long sequenceNumber = committedSnapshot.sequenceNumber(); return new CreateSnapshotEvent( - tableName, operation(), snapshotId, sequenceNumber, snapshot.summary()); + tableName, operation(), snapshotId, sequenceNumber, committedSnapshot.summary()); } @Override diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 75dd7410115e..b174c7914cb7 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -55,12 +55,8 @@ import org.apache.iceberg.util.PartitionSet; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; abstract class MergingSnapshotProducer extends SnapshotProducer { - private static final Logger LOG = LoggerFactory.getLogger(MergingSnapshotProducer.class); - // data is only added in "append" and "overwrite" operations private static final Set VALIDATE_ADDED_FILES_OPERATIONS = ImmutableSet.of(DataOperations.APPEND, DataOperations.OVERWRITE); @@ -956,23 +952,16 @@ public List apply(TableMetadata base, Snapshot snapshot) { } @Override - public Object updateEvent() { + public Object updateEvent(Snapshot committedSnapshot) { long snapshotId = snapshotId(); - Snapshot justSaved = ops().refresh().snapshot(snapshotId); - long sequenceNumber = TableMetadata.INVALID_SEQUENCE_NUMBER; - Map summary; - if (justSaved == null) { - // The snapshot just saved may not be present if the latest metadata couldn't be loaded due to - // eventual - // consistency problems in refresh. - LOG.warn("Failed to load committed snapshot: omitting sequence number from notifications"); - summary = summary(); - } else { - sequenceNumber = justSaved.sequenceNumber(); - summary = justSaved.summary(); - } - - return new CreateSnapshotEvent(tableName, operation(), snapshotId, sequenceNumber, summary); + ValidationException.check( + snapshotId == committedSnapshot.snapshotId(), + "Committed snapshotId %s does not match expected snapshotId %s", + committedSnapshot.snapshotId(), + snapshotId); + long sequenceNumber = committedSnapshot.sequenceNumber(); + return new CreateSnapshotEvent( + tableName, operation(), snapshotId, sequenceNumber, committedSnapshot.summary()); } @SuppressWarnings("checkstyle:CyclomaticComplexity") diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 88b29cef32e9..017638f5a81e 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -475,10 +475,14 @@ public void commit() { } } + Object updateEvent(Snapshot committedSnapshot) { + return updateEvent(); + } + private void notifyListeners(Snapshot committedSnapshot) { try { if (committedSnapshot != null) { - Object event = updateEvent(); + Object event = updateEvent(committedSnapshot); if (event != null) { Listeners.notifyAll(event);