From d4056530d27864adb6cf141d85c81adde46c7b28 Mon Sep 17 00:00:00 2001 From: N-o-Z Date: Wed, 17 Jan 2024 23:02:47 +0200 Subject: [PATCH] Core: Fix lock acquisition logic in HadoopTableOperations rename (#9498) --- .../iceberg/hadoop/HadoopTableOperations.java | 10 ++++- .../iceberg/hadoop/TestHadoopCommits.java | 40 +++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java index 44936f251495..9ef2c63e265c 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java @@ -360,7 +360,11 @@ int findVersion() { */ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { try { - lockManager.acquire(dst.toString(), src.toString()); + if (!lockManager.acquire(dst.toString(), src.toString())) { + throw new CommitFailedException( + "Failed to acquire lock on file: %s with owner: %s", dst, src); + } + if (fs.exists(dst)) { throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); } @@ -383,7 +387,9 @@ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { } throw cfe; } finally { - lockManager.release(dst.toString(), src.toString()); + if (!lockManager.release(dst.toString(), src.toString())) { + LOG.warn("Failed to release lock on file: {} with owner: {}", dst, src); + } } } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java index e02b9deaee85..b3ddc09c0ffd 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java @@ -30,18 +30,22 @@ import java.io.IOException; import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.LockManager; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -451,4 +455,40 @@ public void testConcurrentFastAppends(@TempDir File dir) throws Exception { Assertions.assertThat(Lists.newArrayList(tableWithHighRetries.snapshots())) .hasSize(threadsCount * numberOfCommitedFilesPerThread); } + + @Test + public void testCommitFailedToAcquireLock() { + table.newFastAppend().appendFile(FILE_A).commit(); + Configuration conf = new Configuration(); + LockManager lockManager = new NoLockManager(); + HadoopTableOperations tableOperations = + new HadoopTableOperations( + new Path(table.location()), new HadoopFileIO(conf), conf, lockManager); + tableOperations.refresh(); + BaseTable baseTable = (BaseTable) table; + TableMetadata meta2 = baseTable.operations().current(); + Assertions.assertThatThrownBy(() -> tableOperations.commit(tableOperations.current(), meta2)) + .isInstanceOf(CommitFailedException.class) + .hasMessageStartingWith("Failed to acquire lock on file"); + } + + // Always returns false when trying to acquire + static class NoLockManager implements LockManager { + + @Override + public boolean acquire(String entityId, String ownerId) { + return false; + } + + @Override + public boolean release(String entityId, String ownerId) { + return false; + } + + @Override + public void close() throws Exception {} + + @Override + public void initialize(Map properties) {} + } }