From 90e254807e4fe70b698cc1cc2305853efa63cc7e Mon Sep 17 00:00:00 2001 From: Nir Ozery Date: Wed, 17 Jan 2024 20:24:06 +0200 Subject: [PATCH] CR Fixes --- .../iceberg/hadoop/HadoopTableOperations.java | 10 ++++---- .../iceberg/hadoop/TestHadoopCommits.java | 25 ++++++++++--------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java index 4dcc9f690da0..9ef2c63e265c 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java @@ -360,10 +360,11 @@ int findVersion() { */ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { try { - boolean success = lockManager.acquire(dst.toString(), src.toString()); - if (!success) { - throw new CommitFailedException("Failed to acquire lock on file: %s", dst); + if (!lockManager.acquire(dst.toString(), src.toString())) { + throw new CommitFailedException( + "Failed to acquire lock on file: %s with owner: %s", dst, src); } + if (fs.exists(dst)) { throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); } @@ -386,8 +387,7 @@ private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { } throw cfe; } finally { - boolean success = lockManager.release(dst.toString(), src.toString()); - if (!success) { + if (!lockManager.release(dst.toString(), src.toString())) { LOG.warn("Failed to release lock on file: {} with owner: {}", dst, src); } } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java index 0b8e289955e1..b3ddc09c0ffd 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java @@ -457,37 +457,38 @@ public void testConcurrentFastAppends(@TempDir File dir) throws Exception { } @Test - public void testCommitFailedToAcquire() { + public void testCommitFailedToAcquireLock() { table.newFastAppend().appendFile(FILE_A).commit(); Configuration conf = new Configuration(); LockManager lockManager = new NoLockManager(); - HadoopTableOperations tops = - new HadoopTableOperations(new Path(table.location()), new HadoopFileIO(conf), conf, lockManager); - tops.refresh(); + HadoopTableOperations tableOperations = + new HadoopTableOperations( + new Path(table.location()), new HadoopFileIO(conf), conf, lockManager); + tableOperations.refresh(); BaseTable baseTable = (BaseTable) table; TableMetadata meta2 = baseTable.operations().current(); - Assertions.assertThatThrownBy(() -> tops.commit(tops.current(), meta2)) - .isInstanceOf(CommitFailedException.class) - .hasMessageStartingWith("Failed to acquire lock on file"); + Assertions.assertThatThrownBy(() -> tableOperations.commit(tableOperations.current(), meta2)) + .isInstanceOf(CommitFailedException.class) + .hasMessageStartingWith("Failed to acquire lock on file"); } // Always returns false when trying to acquire static class NoLockManager implements LockManager { - + @Override public boolean acquire(String entityId, String ownerId) { return false; } - + @Override public boolean release(String entityId, String ownerId) { return false; } - + @Override public void close() throws Exception {} - + @Override public void initialize(Map properties) {} } -} \ No newline at end of file +}