From 8f622e5e7ea3d977df74869849580110e5f5e6a8 Mon Sep 17 00:00:00 2001 From: Grant Nicholas Date: Thu, 19 Dec 2024 13:33:59 -0600 Subject: [PATCH] Support SnapshotProducer.updateEvent(Snapshot committedSnapshot) avoids unnecessary iceberg metadata read of just committed snapshot --- .../apache/iceberg/CherryPickOperation.java | 7 +++--- .../java/org/apache/iceberg/FastAppend.java | 13 +++++++--- .../iceberg/MergingSnapshotProducer.java | 25 +++++++------------ .../org/apache/iceberg/SnapshotProducer.java | 6 ++++- 4 files changed, 26 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java index 3786b1185be6..bd960791b0e1 100644 --- a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java +++ b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java @@ -134,20 +134,19 @@ public CherryPickOperation cherrypick(long snapshotId) { } @Override - public Object updateEvent() { + public Object updateEvent(Snapshot committedSnapshot) { if (cherrypickSnapshot == null) { // NOOP operation, no snapshot created return null; } - TableMetadata tableMetadata = refresh(); - long snapshotId = tableMetadata.currentSnapshot().snapshotId(); + long snapshotId = committedSnapshot.snapshotId(); if (cherrypickSnapshot.snapshotId() == snapshotId) { // No new snapshot is created for fast-forward return null; } else { // New snapshot created, we rely on super class to fire a CreateSnapshotEvent - return super.updateEvent(); + return super.updateEvent(committedSnapshot); } } diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 51c0d5926fdb..ab7a6e8370a5 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -25,6 +25,7 @@ import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.events.CreateSnapshotEvent; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -157,12 +158,16 @@ public List apply(TableMetadata base, Snapshot snapshot) { } @Override - public Object updateEvent() { + public Object updateEvent(Snapshot committedSnapshot) { long snapshotId = snapshotId(); - Snapshot snapshot = ops().current().snapshot(snapshotId); - long sequenceNumber = snapshot.sequenceNumber(); + ValidationException.check( + snapshotId == committedSnapshot.snapshotId(), + "Committed snapshotId %s does not match expected snapshotId %s", + committedSnapshot.snapshotId(), + snapshotId); + long sequenceNumber = committedSnapshot.sequenceNumber(); return new CreateSnapshotEvent( - tableName, operation(), snapshotId, sequenceNumber, snapshot.summary()); + tableName, operation(), snapshotId, sequenceNumber, committedSnapshot.summary()); } @Override diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 75dd7410115e..7158fb5b49eb 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -956,23 +956,16 @@ public List apply(TableMetadata base, Snapshot snapshot) { } @Override - public Object updateEvent() { + public Object updateEvent(Snapshot committedSnapshot) { long snapshotId = snapshotId(); - Snapshot justSaved = ops().refresh().snapshot(snapshotId); - long sequenceNumber = TableMetadata.INVALID_SEQUENCE_NUMBER; - Map summary; - if (justSaved == null) { - // The snapshot just saved may not be present if the latest metadata couldn't be loaded due to - // eventual - // consistency problems in refresh. - LOG.warn("Failed to load committed snapshot: omitting sequence number from notifications"); - summary = summary(); - } else { - sequenceNumber = justSaved.sequenceNumber(); - summary = justSaved.summary(); - } - - return new CreateSnapshotEvent(tableName, operation(), snapshotId, sequenceNumber, summary); + ValidationException.check( + snapshotId == committedSnapshot.snapshotId(), + "Committed snapshotId %s does not match expected snapshotId %s", + committedSnapshot.snapshotId(), + snapshotId); + long sequenceNumber = committedSnapshot.sequenceNumber(); + return new CreateSnapshotEvent( + tableName, operation(), snapshotId, sequenceNumber, committedSnapshot.summary()); } @SuppressWarnings("checkstyle:CyclomaticComplexity") diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 88b29cef32e9..95d75a4a2fb2 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -475,10 +475,14 @@ public void commit() { } } + public Object updateEvent(Snapshot committedSnapshot) { + return updateEvent(); + } + private void notifyListeners(Snapshot committedSnapshot) { try { if (committedSnapshot != null) { - Object event = updateEvent(); + Object event = updateEvent(committedSnapshot); if (event != null) { Listeners.notifyAll(event);