From 67c79457378f8ffa8fb972041167b20cac50ea74 Mon Sep 17 00:00:00 2001 From: Nir Ozery Date: Wed, 17 Jan 2024 18:18:52 +0200 Subject: [PATCH] Fix: HadoopTableOperations renameToFinal --- .../iceberg/hadoop/HadoopTableOperations.java | 10 ++++- .../iceberg/hadoop/TestHadoopCommits.java | 41 ++++++++++++++++++- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java index 44936f251495..4dcc9f690da0 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java @@ -360,7 +360,10 @@ int findVersion() { */ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { try { - lockManager.acquire(dst.toString(), src.toString()); + boolean success = lockManager.acquire(dst.toString(), src.toString()); + if (!success) { + throw new CommitFailedException("Failed to acquire lock on file: %s", dst); + } if (fs.exists(dst)) { throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); } @@ -383,7 +386,10 @@ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { } throw cfe; } finally { - lockManager.release(dst.toString(), src.toString()); + boolean success = lockManager.release(dst.toString(), src.toString()); + if (!success) { + LOG.warn("Failed to release lock on file: {} with owner: {}", dst, src); + } } } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java index e02b9deaee85..0b8e289955e1 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java @@ -30,18 +30,22 @@ import java.io.IOException; import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.LockManager; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -451,4 +455,39 @@ public void testConcurrentFastAppends(@TempDir File dir) throws Exception { Assertions.assertThat(Lists.newArrayList(tableWithHighRetries.snapshots())) .hasSize(threadsCount * numberOfCommitedFilesPerThread); } -} + + @Test + public void testCommitFailedToAcquire() { + table.newFastAppend().appendFile(FILE_A).commit(); + Configuration conf = new Configuration(); + LockManager lockManager = new NoLockManager(); + HadoopTableOperations tops = + new HadoopTableOperations(new Path(table.location()), new HadoopFileIO(conf), conf, lockManager); + tops.refresh(); + BaseTable baseTable = (BaseTable) table; + TableMetadata meta2 = baseTable.operations().current(); + Assertions.assertThatThrownBy(() -> tops.commit(tops.current(), meta2)) + .isInstanceOf(CommitFailedException.class) + .hasMessageStartingWith("Failed to acquire lock on file"); + } + + // Always returns false when trying to acquire + static class NoLockManager implements LockManager { + + @Override + public boolean acquire(String entityId, String ownerId) { + return false; + } + + @Override + public boolean release(String entityId, String ownerId) { + return false; + } + + @Override + public void close() throws Exception {} + + @Override + public void initialize(Map properties) {} + } +} \ No newline at end of file