Skip to content

Commit

Permalink
Build: Replace Thread.Sleep() usage with org.Awaitility from Tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
nk1506 committed Oct 11, 2023
1 parent b5ea0d5 commit 6e76125
Show file tree
Hide file tree
Showing 13 changed files with 142 additions and 98 deletions.
54 changes: 54 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.iceberg.expressions.BoundPredicate;
import org.apache.iceberg.expressions.BoundSetPredicate;
Expand All @@ -47,10 +49,15 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.ByteBuffers;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ThrowingRunnable;
import org.objenesis.strategy.StdInstantiatorStrategy;

public class TestHelpers {

public static final long DEFAULT_MAX_WAIT_TIME =
3600000L; // for Awaitility it can be used for max wait time of 1 hour.

private TestHelpers() {}

/** Wait in a tight check loop until system clock is past {@code timestampMillis} */
Expand All @@ -62,6 +69,53 @@ public static long waitUntilAfter(long timestampMillis) {
return current;
}

/** wait for fixed duration */
public static void delayInMilliseconds(String message, long delay) {
long previousTimestampMillis = System.currentTimeMillis();
Awaitility.await(message)
.pollInSameThread()
.pollDelay(delay, TimeUnit.MILLISECONDS)
.pollInterval(delay + 1, TimeUnit.MILLISECONDS)
.atMost(delay + 2, TimeUnit.MILLISECONDS)
.until(() -> System.currentTimeMillis() - previousTimestampMillis > delay);
}

/**
* Wait until the condition is true, or it reaches the max wait time. It evaluates the condition
* on the defined poll interval. It helps to avoid infinite loops.
*/
public static void delayUntilTrueCondition(
String message,
long minWaitingTime,
long pollInterval,
long maxWaitingTime,
TimeUnit timeUnit,
Callable<Boolean> condition) {
Awaitility.await(message)
.pollInterval(pollInterval, timeUnit)
.pollDelay(minWaitingTime, timeUnit)
.atMost(maxWaitingTime, timeUnit)
.until(condition);
}

/**
* Wait until the assertion is true, or it reaches the max wait time. It can be used to wait until
* true assertion.
*/
public static void delayUntilAssertedCondition(
String message,
long minWaitingTime,
long pollInterval,
long maxWaitingTime,
TimeUnit timeUnit,
ThrowingRunnable condition) {
Awaitility.await(message)
.pollInterval(pollInterval, timeUnit)
.pollDelay(minWaitingTime, timeUnit)
.atMost(maxWaitingTime, timeUnit)
.untilAsserted(condition);
}

public static <T> T assertAndUnwrap(Expression expr, Class<T> expected) {
assertThat(expr).as("Expression should have expected type: " + expected).isInstanceOf(expected);
return expected.cast(expr);
Expand Down
46 changes: 11 additions & 35 deletions api/src/test/java/org/apache/iceberg/metrics/TestDefaultTimer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.iceberg.TestHelpers.delayInMilliseconds;

import java.time.Duration;
import java.util.List;
Expand All @@ -31,6 +32,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.TestHelpers;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -87,12 +89,12 @@ public void multipleStops() {
}

@Test
public void closeableTimer() throws InterruptedException {
public void closeableTimer() {
Timer timer = new DefaultTimer(TimeUnit.NANOSECONDS);
Assertions.assertThat(timer.count()).isEqualTo(0);
Assertions.assertThat(timer.totalDuration()).isEqualTo(Duration.ZERO);
try (Timer.Timed sample = timer.start()) {
Thread.sleep(500L);
try (Timer.Timed ignored = timer.start()) {
delayInMilliseconds("wait 100 ms before timer to close", 100);
}
Assertions.assertThat(timer.count()).isEqualTo(1);
Assertions.assertThat(timer.totalDuration()).isGreaterThan(Duration.ZERO);
Expand All @@ -101,14 +103,7 @@ public void closeableTimer() throws InterruptedException {
@Test
public void measureRunnable() {
Timer timer = new DefaultTimer(TimeUnit.NANOSECONDS);
Runnable runnable =
() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
Runnable runnable = () -> TestHelpers.delayInMilliseconds("wait for 100 ms", 100);
Assertions.assertThat(timer.count()).isEqualTo(0);
Assertions.assertThat(timer.totalDuration()).isEqualTo(Duration.ZERO);

Expand All @@ -128,11 +123,7 @@ public void measureCallable() throws Exception {
Timer timer = new DefaultTimer(TimeUnit.NANOSECONDS);
Callable<Boolean> callable =
() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
delayInMilliseconds("wait for 100 ms", 100);
return true;
};
Assertions.assertThat(timer.count()).isEqualTo(0);
Expand All @@ -154,11 +145,7 @@ public void measureSupplier() {
Timer timer = new DefaultTimer(TimeUnit.NANOSECONDS);
Supplier<Boolean> supplier =
() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
delayInMilliseconds("wait for 100 ms", 100);
return true;
};
Assertions.assertThat(timer.count()).isEqualTo(0);
Expand All @@ -179,23 +166,12 @@ public void measureSupplier() {
public void measureNestedRunnables() {
Timer timer = new DefaultTimer(TimeUnit.NANOSECONDS);
Timer innerTimer = new DefaultTimer(TimeUnit.NANOSECONDS);
Runnable inner =
() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
Runnable inner = () -> delayInMilliseconds("wait 100 ms for inner timer", 100);

Runnable outer =
() -> {
try {
Thread.sleep(100);
innerTimer.time(inner);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
delayInMilliseconds("wait 100 ms for outer timer", 100);
innerTimer.time(inner);
};

Assertions.assertThat(timer.count()).isEqualTo(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.catalog.Namespace;
Expand All @@ -42,6 +44,7 @@
import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
import software.amazon.awssdk.services.iam.model.DeleteRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.DeleteRoleRequest;
import software.amazon.awssdk.services.iam.model.GetRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.PutRolePolicyRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;

Expand Down Expand Up @@ -100,7 +103,7 @@ public void after() {
}

@Test
public void testAssumeRoleGlueCatalog() throws Exception {
public void testAssumeRoleGlueCatalog() {
String glueArnPrefix = "arn:aws:glue:*:" + AwsIntegTestUtil.testAccountId();
iam.putRolePolicy(
PutRolePolicyRequest.builder()
Expand Down Expand Up @@ -189,7 +192,17 @@ public void testAssumeRoleS3FileIO() throws Exception {
Assert.assertFalse("should be able to access file", inputFile.exists());
}

private void waitForIamConsistency() throws Exception {
Thread.sleep(10000); // sleep to make sure IAM up to date
private void waitForIamConsistency() {
TestHelpers.delayUntilTrueCondition(
"wait for IAM role policy to update.",
1000,
1001,
10001,
TimeUnit.MILLISECONDS,
() ->
iam.getRolePolicy(
GetRolePolicyRequest.builder().roleName(roleName).roleName(policyName).build())
.roleName()
.equalsIgnoreCase(roleName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.aws.AwsClientFactories;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -141,11 +142,8 @@ public void testAcquireSingleProcess() throws Exception {

CompletableFuture.supplyAsync(
() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
TestHelpers.delayInMilliseconds(
"wait for 5 seconds for DynamoDbLockManager to release", 5000);
Assert.assertTrue(lockManager.release(entityId, oldOwner));
return null;
});
Expand Down Expand Up @@ -176,11 +174,8 @@ public void testAcquireMultiProcessAllSucceed() throws Exception {
String owner = UUID.randomUUID().toString();
boolean succeeded = threadLocalLockManager.acquire(entityId, owner);
if (succeeded) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
TestHelpers.delayInMilliseconds(
"wait for 1 second for DynamoDbLockManager to release", 1000);
Assert.assertTrue(threadLocalLockManager.release(entityId, owner));
}
return succeeded;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -31,6 +32,7 @@
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.aws.dynamodb.DynamoDbLockManager;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
Expand Down Expand Up @@ -139,14 +141,14 @@ public void testParallelCommitMultiThreadMultiCommit() {
.run(
index -> {
for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) {
while (barrier.get() < numCommittedFiles * 2) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

final int currentFilesCount = numCommittedFiles;
org.apache.iceberg.TestHelpers.delayUntilTrueCondition(
"wait for atomic integer to increment",
10,
10,
TestHelpers.DEFAULT_MAX_WAIT_TIME,
TimeUnit.MILLISECONDS,
() -> !(barrier.get() < currentFilesCount * 2));
table.newFastAppend().appendFile(file).commit();
barrier.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.UUID;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.aws.AssumeRoleAwsClientFactory;
import org.apache.iceberg.aws.AwsIntegTestUtil;
import org.apache.iceberg.aws.AwsProperties;
Expand Down Expand Up @@ -358,7 +359,9 @@ String getRandomTableName() {
}

private static void waitForIamConsistency() throws Exception {
Thread.sleep(IAM_PROPAGATION_DELAY); // sleep to make sure IAM up to date
TestHelpers.delayInMilliseconds(
"wait 10 seconds for IAM policy to update",
IAM_PROPAGATION_DELAY); // wait to make sure IAM up to date
}

private static LakeFormationClient buildLakeFormationClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.aws.AwsIntegTestUtil;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.aws.HttpClientProperties;
Expand Down Expand Up @@ -163,7 +164,9 @@ public void testLakeFormationEnabledGlueCatalog() throws Exception {
}
}

private void waitForIamConsistency() throws Exception {
Thread.sleep(IAM_PROPAGATION_DELAY); // sleep to make sure IAM up to date
private void waitForIamConsistency() {
TestHelpers.delayInMilliseconds(
"wait 10 seconds for IAM policy to update",
IAM_PROPAGATION_DELAY); // wait to make sure IAM up to date
}
}
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ project(':iceberg-api') {
compileOnly libs.findbugs.jsr305
testImplementation libs.avro.avro
testImplementation libs.esotericsoftware.kryo
testImplementation libs.awaitility
}

tasks.processTestResources.dependsOn rootProject.tasks.buildInfo
Expand Down Expand Up @@ -712,6 +713,7 @@ project(':iceberg-hive-metastore') {
}

testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation libs.awaitility
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.stream.IntStream;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -122,12 +123,8 @@ private static class CustomCommitService extends BaseCommitService<Integer> {

@Override
protected void commitOrClean(Set<Integer> batch) {
try {
// Slightly longer than timeout
Thread.sleep(210);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// Slightly longer than timeout
TestHelpers.delayInMilliseconds("wait for 210 milliseconds", 210);
}

@Override
Expand Down
Loading

0 comments on commit 6e76125

Please sign in to comment.