From 7b2c7acee12e60f3a5413994a2919b30d70f8336 Mon Sep 17 00:00:00 2001 From: nk1506 Date: Thu, 12 Oct 2023 00:26:36 +0530 Subject: [PATCH] Build: Replace Thread.Sleep() usage with org.Awaitility from Tests. --- .../java/org/apache/iceberg/TestHelpers.java | 56 +++++++++++++++++++ .../iceberg/metrics/TestDefaultTimer.java | 46 ++++----------- .../aws/TestAssumeRoleAwsClientFactory.java | 19 ++++++- .../aws/dynamodb/TestDynamoDbLockManager.java | 17 +++--- .../iceberg/aws/glue/TestGlueCatalogLock.java | 18 +++--- .../lakeformation/LakeFormationTestBase.java | 6 +- .../TestLakeFormationAwsClientFactory.java | 8 ++- build.gradle | 2 + .../iceberg/actions/TestCommitService.java | 9 +-- .../iceberg/hadoop/TestHadoopCommits.java | 17 +++--- .../jdbc/TestJdbcTableConcurrency.java | 17 +++--- .../iceberg/util/TestInMemoryLockManager.java | 13 +---- .../hive/TestHiveTableConcurrency.java | 18 +++--- 13 files changed, 148 insertions(+), 98 deletions(-) diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index 153e2de7ea9a..3bd42ff97dc9 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -38,6 +38,8 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.apache.iceberg.expressions.BoundPredicate; import org.apache.iceberg.expressions.BoundSetPredicate; @@ -47,10 +49,16 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.ByteBuffers; import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionFactory; +import org.awaitility.core.ThrowingRunnable; import org.objenesis.strategy.StdInstantiatorStrategy; public class TestHelpers { + public static final long DEFAULT_MAX_WAIT_TIME = + 300000L; // for Awaitility it can be used for max wait time of 5 minutes. + private TestHelpers() {} /** Wait in a tight check loop until system clock is past {@code timestampMillis} */ @@ -62,6 +70,54 @@ public static long waitUntilAfter(long timestampMillis) { return current; } + /** wait for fixed duration */ + public static void delayInMilliseconds(String message, long delay, boolean useSameThread) { + ConditionFactory conditionFactory = Awaitility.await(message); + if (useSameThread) { + conditionFactory = conditionFactory.pollInSameThread(); + } + conditionFactory + .pollDelay(delay, TimeUnit.MILLISECONDS) + .atMost(delay + 1, TimeUnit.MILLISECONDS) + .until(() -> true); + } + + /** + * Wait until the condition is true, or it reaches the max wait time. It evaluates the condition + * on the defined poll interval. It helps to avoid infinite loops. + */ + public static void delayUntilTrueCondition( + String message, + long minWaitingTime, + long pollInterval, + long maxWaitingTime, + TimeUnit timeUnit, + Callable condition) { + Awaitility.await(message) + .pollInterval(pollInterval, timeUnit) + .pollDelay(minWaitingTime, timeUnit) + .atMost(maxWaitingTime, timeUnit) + .until(condition); + } + + /** + * Wait until the assertion is true, or it reaches the max wait time. It can be used to wait until + * true assertion. + */ + public static void delayUntilAssertedCondition( + String message, + long minWaitingTime, + long pollInterval, + long maxWaitingTime, + TimeUnit timeUnit, + ThrowingRunnable condition) { + Awaitility.await(message) + .pollInterval(pollInterval, timeUnit) + .pollDelay(minWaitingTime, timeUnit) + .atMost(maxWaitingTime, timeUnit) + .untilAsserted(condition); + } + public static T assertAndUnwrap(Expression expr, Class expected) { assertThat(expr).as("Expression should have expected type: " + expected).isInstanceOf(expected); return expected.cast(expr); diff --git a/api/src/test/java/org/apache/iceberg/metrics/TestDefaultTimer.java b/api/src/test/java/org/apache/iceberg/metrics/TestDefaultTimer.java index 950bbc931da4..c8905e6df3b5 100644 --- a/api/src/test/java/org/apache/iceberg/metrics/TestDefaultTimer.java +++ b/api/src/test/java/org/apache/iceberg/metrics/TestDefaultTimer.java @@ -20,6 +20,7 @@ import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.iceberg.TestHelpers.delayInMilliseconds; import java.time.Duration; import java.util.List; @@ -31,6 +32,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.iceberg.TestHelpers; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -87,12 +89,12 @@ public void multipleStops() { } @Test - public void closeableTimer() throws InterruptedException { + public void closeableTimer() { Timer timer = new DefaultTimer(TimeUnit.NANOSECONDS); Assertions.assertThat(timer.count()).isEqualTo(0); Assertions.assertThat(timer.totalDuration()).isEqualTo(Duration.ZERO); - try (Timer.Timed sample = timer.start()) { - Thread.sleep(500L); + try (Timer.Timed ignored = timer.start()) { + delayInMilliseconds("wait 100 ms before timer to close", 100, false); } Assertions.assertThat(timer.count()).isEqualTo(1); Assertions.assertThat(timer.totalDuration()).isGreaterThan(Duration.ZERO); @@ -101,14 +103,7 @@ public void closeableTimer() throws InterruptedException { @Test public void measureRunnable() { Timer timer = new DefaultTimer(TimeUnit.NANOSECONDS); - Runnable runnable = - () -> { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }; + Runnable runnable = () -> TestHelpers.delayInMilliseconds("wait for 100 ms", 100, false); Assertions.assertThat(timer.count()).isEqualTo(0); Assertions.assertThat(timer.totalDuration()).isEqualTo(Duration.ZERO); @@ -128,11 +123,7 @@ public void measureCallable() throws Exception { Timer timer = new DefaultTimer(TimeUnit.NANOSECONDS); Callable callable = () -> { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + delayInMilliseconds("wait for 100 ms", 100, false); return true; }; Assertions.assertThat(timer.count()).isEqualTo(0); @@ -154,11 +145,7 @@ public void measureSupplier() { Timer timer = new DefaultTimer(TimeUnit.NANOSECONDS); Supplier supplier = () -> { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + delayInMilliseconds("wait for 100 ms", 100, false); return true; }; Assertions.assertThat(timer.count()).isEqualTo(0); @@ -179,23 +166,12 @@ public void measureSupplier() { public void measureNestedRunnables() { Timer timer = new DefaultTimer(TimeUnit.NANOSECONDS); Timer innerTimer = new DefaultTimer(TimeUnit.NANOSECONDS); - Runnable inner = - () -> { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }; + Runnable inner = () -> delayInMilliseconds("wait 100 ms for inner timer", 100, false); Runnable outer = () -> { - try { - Thread.sleep(100); - innerTimer.time(inner); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + delayInMilliseconds("wait 100 ms for outer timer", 100, false); + innerTimer.time(inner); }; Assertions.assertThat(timer.count()).isEqualTo(0); diff --git a/aws/src/integration/java/org/apache/iceberg/aws/TestAssumeRoleAwsClientFactory.java b/aws/src/integration/java/org/apache/iceberg/aws/TestAssumeRoleAwsClientFactory.java index 4f2b3b8a1f79..4f8da4a6e76d 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/TestAssumeRoleAwsClientFactory.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/TestAssumeRoleAwsClientFactory.java @@ -20,6 +20,8 @@ import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.aws.glue.GlueCatalog; import org.apache.iceberg.aws.s3.S3FileIO; import org.apache.iceberg.catalog.Namespace; @@ -42,6 +44,7 @@ import software.amazon.awssdk.services.iam.model.CreateRoleResponse; import software.amazon.awssdk.services.iam.model.DeleteRolePolicyRequest; import software.amazon.awssdk.services.iam.model.DeleteRoleRequest; +import software.amazon.awssdk.services.iam.model.GetRolePolicyRequest; import software.amazon.awssdk.services.iam.model.PutRolePolicyRequest; import software.amazon.awssdk.services.s3.model.S3Exception; @@ -100,7 +103,7 @@ public void after() { } @Test - public void testAssumeRoleGlueCatalog() throws Exception { + public void testAssumeRoleGlueCatalog() { String glueArnPrefix = "arn:aws:glue:*:" + AwsIntegTestUtil.testAccountId(); iam.putRolePolicy( PutRolePolicyRequest.builder() @@ -189,7 +192,17 @@ public void testAssumeRoleS3FileIO() throws Exception { Assert.assertFalse("should be able to access file", inputFile.exists()); } - private void waitForIamConsistency() throws Exception { - Thread.sleep(10000); // sleep to make sure IAM up to date + private void waitForIamConsistency() { + TestHelpers.delayUntilTrueCondition( + "wait for IAM role policy to update.", + 1000, + 1001, + 10001, + TimeUnit.MILLISECONDS, + () -> + iam.getRolePolicy( + GetRolePolicyRequest.builder().roleName(roleName).roleName(policyName).build()) + .roleName() + .equalsIgnoreCase(roleName)); } } diff --git a/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/TestDynamoDbLockManager.java b/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/TestDynamoDbLockManager.java index 90b8a8347395..5df9958ba609 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/TestDynamoDbLockManager.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/TestDynamoDbLockManager.java @@ -27,6 +27,7 @@ import java.util.stream.IntStream; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.aws.AwsClientFactories; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -141,11 +142,8 @@ public void testAcquireSingleProcess() throws Exception { CompletableFuture.supplyAsync( () -> { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + TestHelpers.delayInMilliseconds( + "wait for 5 seconds for DynamoDbLockManager to release", 5000, true); Assert.assertTrue(lockManager.release(entityId, oldOwner)); return null; }); @@ -176,11 +174,10 @@ public void testAcquireMultiProcessAllSucceed() throws Exception { String owner = UUID.randomUUID().toString(); boolean succeeded = threadLocalLockManager.acquire(entityId, owner); if (succeeded) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + TestHelpers.delayInMilliseconds( + "wait for 1 second for DynamoDbLockManager to release", + 1000, + true); Assert.assertTrue(threadLocalLockManager.release(entityId, owner)); } return succeeded; diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogLock.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogLock.java index 4c48a277192d..29578fa7aa33 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogLock.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogLock.java @@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -31,6 +32,7 @@ import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.aws.dynamodb.DynamoDbLockManager; import org.apache.iceberg.aws.s3.S3FileIOProperties; @@ -139,14 +141,14 @@ public void testParallelCommitMultiThreadMultiCommit() { .run( index -> { for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) { - while (barrier.get() < numCommittedFiles * 2) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - + final int currentFilesCount = numCommittedFiles; + org.apache.iceberg.TestHelpers.delayUntilTrueCondition( + "wait for atomic integer to increment", + 10, + 10, + TestHelpers.DEFAULT_MAX_WAIT_TIME, + TimeUnit.MILLISECONDS, + () -> !(barrier.get() < currentFilesCount * 2)); table.newFastAppend().appendFile(file).commit(); barrier.incrementAndGet(); } diff --git a/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/LakeFormationTestBase.java b/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/LakeFormationTestBase.java index 431ee91bd22f..4ddfdc815784 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/LakeFormationTestBase.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/LakeFormationTestBase.java @@ -27,6 +27,7 @@ import java.util.UUID; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.aws.AssumeRoleAwsClientFactory; import org.apache.iceberg.aws.AwsIntegTestUtil; import org.apache.iceberg.aws.AwsProperties; @@ -358,7 +359,10 @@ String getRandomTableName() { } private static void waitForIamConsistency() throws Exception { - Thread.sleep(IAM_PROPAGATION_DELAY); // sleep to make sure IAM up to date + TestHelpers.delayInMilliseconds( + "wait 10 seconds for IAM policy to update", + IAM_PROPAGATION_DELAY, + true); // wait to make sure IAM up to date } private static LakeFormationClient buildLakeFormationClient( diff --git a/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/TestLakeFormationAwsClientFactory.java b/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/TestLakeFormationAwsClientFactory.java index 9c59ba1aa19e..6f83dbca8578 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/TestLakeFormationAwsClientFactory.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/TestLakeFormationAwsClientFactory.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.UUID; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.aws.AwsIntegTestUtil; import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.aws.HttpClientProperties; @@ -163,7 +164,10 @@ public void testLakeFormationEnabledGlueCatalog() throws Exception { } } - private void waitForIamConsistency() throws Exception { - Thread.sleep(IAM_PROPAGATION_DELAY); // sleep to make sure IAM up to date + private void waitForIamConsistency() { + TestHelpers.delayInMilliseconds( + "wait 10 seconds for IAM policy to update", + IAM_PROPAGATION_DELAY, + true); // wait to make sure IAM up to date } } diff --git a/build.gradle b/build.gradle index 55830c6990f4..82f409858005 100644 --- a/build.gradle +++ b/build.gradle @@ -293,6 +293,7 @@ project(':iceberg-api') { compileOnly libs.findbugs.jsr305 testImplementation libs.avro.avro testImplementation libs.esotericsoftware.kryo + testImplementation libs.awaitility } tasks.processTestResources.dependsOn rootProject.tasks.buildInfo @@ -712,6 +713,7 @@ project(':iceberg-hive-metastore') { } testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') + testImplementation libs.awaitility } } diff --git a/core/src/test/java/org/apache/iceberg/actions/TestCommitService.java b/core/src/test/java/org/apache/iceberg/actions/TestCommitService.java index 1aae6483337f..661194298aba 100644 --- a/core/src/test/java/org/apache/iceberg/actions/TestCommitService.java +++ b/core/src/test/java/org/apache/iceberg/actions/TestCommitService.java @@ -29,6 +29,7 @@ import java.util.stream.IntStream; import org.apache.iceberg.Table; import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -122,12 +123,8 @@ private static class CustomCommitService extends BaseCommitService { @Override protected void commitOrClean(Set batch) { - try { - // Slightly longer than timeout - Thread.sleep(210); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + // Slightly longer than timeout + TestHelpers.delayInMilliseconds("wait for 210 milliseconds", 210, true); } @Override diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java index b9c23f33a583..e16fee03fe50 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java @@ -33,6 +33,7 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; @@ -46,6 +47,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -435,13 +437,14 @@ public void testConcurrentFastAppends(@TempDir File dir) throws Exception { for (int numCommittedFiles = 0; numCommittedFiles < numberOfCommitedFilesPerThread; numCommittedFiles++) { - while (barrier.get() < numCommittedFiles * threadsCount) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } + final int currentFilesCount = numCommittedFiles; + TestHelpers.delayUntilTrueCondition( + "wait for atomic integer to increment", + 10, + 10, + TestHelpers.DEFAULT_MAX_WAIT_TIME, + TimeUnit.MILLISECONDS, + () -> !(barrier.get() < currentFilesCount * threadsCount)); tableWithHighRetries.newFastAppend().appendFile(file).commit(); barrier.incrementAndGet(); } diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcTableConcurrency.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcTableConcurrency.java index b4c5677dff80..1bf96ebbba89 100644 --- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcTableConcurrency.java +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcTableConcurrency.java @@ -40,6 +40,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -92,14 +93,14 @@ public synchronized void testConcurrentFastAppends() throws IOException { .run( index -> { for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) { - while (barrier.get() < numCommittedFiles * 2) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - + final int currentFilesCount = numCommittedFiles; + TestHelpers.delayUntilTrueCondition( + "wait for atomic integer to increment", + 10, + 10, + TestHelpers.DEFAULT_MAX_WAIT_TIME, + TimeUnit.MILLISECONDS, + () -> !(barrier.get() < currentFilesCount * 2)); icebergTable.newFastAppend().appendFile(file).commit(); barrier.incrementAndGet(); } diff --git a/core/src/test/java/org/apache/iceberg/util/TestInMemoryLockManager.java b/core/src/test/java/org/apache/iceberg/util/TestInMemoryLockManager.java index 7f7a39706517..85ea31584178 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestInMemoryLockManager.java +++ b/core/src/test/java/org/apache/iceberg/util/TestInMemoryLockManager.java @@ -26,6 +26,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.assertj.core.api.Assertions; @@ -111,11 +112,7 @@ public void testAcquireSingleProcess() throws Exception { CompletableFuture.supplyAsync( () -> { - try { - Thread.sleep(200); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + TestHelpers.delayInMilliseconds("wait for 200 milliseconds", 200, true); assertThat(lockManager.release(lockEntityId, oldOwner)).isTrue(); return null; }); @@ -140,11 +137,7 @@ public void testAcquireMultiProcessAllSucceed() { String owner = UUID.randomUUID().toString(); boolean succeeded = lockManager.acquire(lockEntityId, owner); if (succeeded) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + TestHelpers.delayInMilliseconds("wait for 1 second", 1000L, true); assertThat(lockManager.release(lockEntityId, owner)).isTrue(); } return succeeded; diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java index e7608962cb85..d7337107b2b6 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java @@ -21,6 +21,7 @@ import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.TestHelpers.DEFAULT_MAX_WAIT_TIME; import static org.assertj.core.api.Assertions.assertThat; import java.util.UUID; @@ -33,6 +34,7 @@ import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.util.Tasks; import org.junit.jupiter.api.Test; @@ -63,14 +65,14 @@ public synchronized void testConcurrentFastAppends() { .run( index -> { for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) { - while (barrier.get() < numCommittedFiles * 2) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - + final int currentFilesCount = numCommittedFiles; + TestHelpers.delayUntilTrueCondition( + "wait for atomic integer to increment", + 10, + 10, + DEFAULT_MAX_WAIT_TIME, + TimeUnit.MILLISECONDS, + () -> !(barrier.get() < currentFilesCount * 2)); icebergTable.newFastAppend().appendFile(file).commit(); barrier.incrementAndGet(); }