Skip to content

Commit

Permalink
Only fire updateEvent if snapshot has commmitted
Browse files Browse the repository at this point in the history
  • Loading branch information
grantatspothero committed Dec 19, 2024
1 parent 3535240 commit 6da8188
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ protected TableMetadata refresh() {
public void commit() {
// this is always set to the latest commit attempt's snapshot
AtomicReference<Snapshot> stagedSnapshot = new AtomicReference<>();
Snapshot committedSnapshot;
try (Timed ignore = commitMetrics().totalDuration().start()) {
try {
Tasks.foreach(ops)
Expand Down Expand Up @@ -444,7 +445,7 @@ public void commit() {
}

// at this point, the commit must have succeeded so the stagedSnapshot is committed
Snapshot committedSnapshot = stagedSnapshot.get();
committedSnapshot = stagedSnapshot.get();
try {
LOG.info(
"Committed snapshot {} ({})",
Expand All @@ -468,31 +469,33 @@ public void commit() {
}

try {
notifyListeners();
notifyListeners(committedSnapshot);
} catch (Throwable e) {
LOG.warn("Failed to notify event listeners", e);
}
}

private void notifyListeners() {
private void notifyListeners(Snapshot committedSnapshot) {
try {
Object event = updateEvent();
if (event != null) {
Listeners.notifyAll(event);

if (event instanceof CreateSnapshotEvent) {
CreateSnapshotEvent createSnapshotEvent = (CreateSnapshotEvent) event;

reporter.report(
ImmutableCommitReport.builder()
.tableName(createSnapshotEvent.tableName())
.snapshotId(createSnapshotEvent.snapshotId())
.operation(createSnapshotEvent.operation())
.sequenceNumber(createSnapshotEvent.sequenceNumber())
.metadata(EnvironmentContext.get())
.commitMetrics(
CommitMetricsResult.from(commitMetrics(), createSnapshotEvent.summary()))
.build());
if (committedSnapshot != null) {
Object event = updateEvent();
if (event != null) {
Listeners.notifyAll(event);

if (event instanceof CreateSnapshotEvent) {
CreateSnapshotEvent createSnapshotEvent = (CreateSnapshotEvent) event;

reporter.report(
ImmutableCommitReport.builder()
.tableName(createSnapshotEvent.tableName())
.snapshotId(createSnapshotEvent.snapshotId())
.operation(createSnapshotEvent.operation())
.sequenceNumber(createSnapshotEvent.sequenceNumber())
.metadata(EnvironmentContext.get())
.commitMetrics(
CommitMetricsResult.from(commitMetrics(), createSnapshotEvent.summary()))
.build());
}
}
}
} catch (RuntimeException e) {
Expand Down

0 comments on commit 6da8188

Please sign in to comment.