Skip to content

Commit

Permalink
Build: Replace Thread.Sleep() usage with org.Awaitility from Tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
nk1506 committed Oct 19, 2023
1 parent b5ea0d5 commit 9bc33bd
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 59 deletions.
19 changes: 6 additions & 13 deletions api/src/test/java/org/apache/iceberg/metrics/TestDefaultTimer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.iceberg.TestHelpers.waitUntilAfter;

import java.time.Duration;
import java.util.List;
Expand Down Expand Up @@ -87,12 +88,12 @@ public void multipleStops() {
}

@Test
public void closeableTimer() throws InterruptedException {
public void closeableTimer() {
Timer timer = new DefaultTimer(TimeUnit.NANOSECONDS);
Assertions.assertThat(timer.count()).isEqualTo(0);
Assertions.assertThat(timer.totalDuration()).isEqualTo(Duration.ZERO);
try (Timer.Timed sample = timer.start()) {
Thread.sleep(500L);
try (Timer.Timed ignored = timer.start()) {
waitUntilAfter(System.currentTimeMillis());
}
Assertions.assertThat(timer.count()).isEqualTo(1);
Assertions.assertThat(timer.totalDuration()).isGreaterThan(Duration.ZERO);
Expand Down Expand Up @@ -128,11 +129,7 @@ public void measureCallable() throws Exception {
Timer timer = new DefaultTimer(TimeUnit.NANOSECONDS);
Callable<Boolean> callable =
() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
waitUntilAfter(System.currentTimeMillis());
return true;
};
Assertions.assertThat(timer.count()).isEqualTo(0);
Expand All @@ -154,11 +151,7 @@ public void measureSupplier() {
Timer timer = new DefaultTimer(TimeUnit.NANOSECONDS);
Supplier<Boolean> supplier =
() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
waitUntilAfter(System.currentTimeMillis());
return true;
};
Assertions.assertThat(timer.count()).isEqualTo(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.aws;

import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.aws.glue.GlueCatalog;
Expand All @@ -27,6 +28,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -42,6 +44,7 @@
import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
import software.amazon.awssdk.services.iam.model.DeleteRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.DeleteRoleRequest;
import software.amazon.awssdk.services.iam.model.GetRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.PutRolePolicyRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;

Expand Down Expand Up @@ -100,7 +103,7 @@ public void after() {
}

@Test
public void testAssumeRoleGlueCatalog() throws Exception {
public void testAssumeRoleGlueCatalog() {
String glueArnPrefix = "arn:aws:glue:*:" + AwsIntegTestUtil.testAccountId();
iam.putRolePolicy(
PutRolePolicyRequest.builder()
Expand Down Expand Up @@ -189,7 +192,18 @@ public void testAssumeRoleS3FileIO() throws Exception {
Assert.assertFalse("should be able to access file", inputFile.exists());
}

private void waitForIamConsistency() throws Exception {
Thread.sleep(10000); // sleep to make sure IAM up to date
private void waitForIamConsistency() {
Awaitility.await("wait for IAM role policy to update.")
.pollDelay(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.until(
() ->
iam.getRolePolicy(
GetRolePolicyRequest.builder()
.roleName(roleName)
.roleName(policyName)
.build())
.roleName()
.equalsIgnoreCase(roleName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.aws.glue;

import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand All @@ -38,6 +39,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.util.Tasks;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -139,14 +141,11 @@ public void testParallelCommitMultiThreadMultiCommit() {
.run(
index -> {
for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) {
while (barrier.get() < numCommittedFiles * 2) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

final int currentFilesCount = numCommittedFiles;
Awaitility.await()
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofMillis(1000))
.until(() -> barrier.get() >= currentFilesCount * 2);
table.newFastAppend().appendFile(file).commit();
barrier.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -37,6 +38,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
Expand Down Expand Up @@ -67,6 +69,7 @@
import software.amazon.awssdk.services.lakeformation.model.DataLocationResource;
import software.amazon.awssdk.services.lakeformation.model.DatabaseResource;
import software.amazon.awssdk.services.lakeformation.model.DeregisterResourceRequest;
import software.amazon.awssdk.services.lakeformation.model.DescribeResourceRequest;
import software.amazon.awssdk.services.lakeformation.model.EntityNotFoundException;
import software.amazon.awssdk.services.lakeformation.model.GetDataLakeSettingsRequest;
import software.amazon.awssdk.services.lakeformation.model.GetDataLakeSettingsResponse;
Expand Down Expand Up @@ -175,7 +178,7 @@ public static void beforeClass() throws Exception {
lfRegisterPathRoleIamPolicyName,
lfRegisterPathRolePolicyDocForIam(lfRegisterPathRoleArn),
lfRegisterPathRoleName);
waitForIamConsistency();
waitForIamConsistency(lfRegisterPathRoleName, lfRegisterPathRoleIamPolicyName);

// create lfPrivilegedRole
response =
Expand Down Expand Up @@ -205,7 +208,7 @@ public static void beforeClass() throws Exception {
lfPrivilegedRolePolicyName,
lfPrivilegedRolePolicyDoc(),
lfPrivilegedRoleName);
waitForIamConsistency();
waitForIamConsistency(lfPrivilegedRoleName, lfPrivilegedRolePolicyName);

// build lf and glue client with lfRegisterPathRole
lakeformation =
Expand Down Expand Up @@ -250,7 +253,6 @@ public static void beforeClass() throws Exception {
// register S3 test bucket path
deregisterResource(testBucketPath);
registerResource(testBucketPath);
waitForIamConsistency();
}

@AfterClass
Expand Down Expand Up @@ -357,8 +359,20 @@ String getRandomTableName() {
return LF_TEST_TABLE_PREFIX + UUID.randomUUID().toString().replace("-", "");
}

private static void waitForIamConsistency() throws Exception {
Thread.sleep(IAM_PROPAGATION_DELAY); // sleep to make sure IAM up to date
private static void waitForIamConsistency(String roleName, String policyName) {
// wait to make sure IAM up to date
Awaitility.await()
.pollDelay(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.until(
() ->
iam.getRolePolicy(
GetRolePolicyRequest.builder()
.roleName(roleName)
.policyName(policyName)
.build())
.roleName()
.equalsIgnoreCase(roleName));
}

private static LakeFormationClient buildLakeFormationClient(
Expand Down Expand Up @@ -417,7 +431,16 @@ private static void registerResource(String s3Location) {
.build());
// when a resource is registered, LF will update SLR with necessary permissions which has a
// propagation delay
waitForIamConsistency();
Awaitility.await()
.pollDelay(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.until(
() ->
lakeformation
.describeResource(DescribeResourceRequest.builder().resourceArn(arn).build())
.resourceInfo()
.roleArn()
.equalsIgnoreCase(lfRegisterPathRoleArn));
} catch (AlreadyExistsException e) {
LOG.warn("Resource {} already registered. Error: {}", arn, e.getMessage(), e);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.aws.lakeformation;

import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.aws.AwsIntegTestUtil;
Expand All @@ -26,6 +27,7 @@
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -41,6 +43,7 @@
import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
import software.amazon.awssdk.services.iam.model.DeleteRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.DeleteRoleRequest;
import software.amazon.awssdk.services.iam.model.GetRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.PutRolePolicyRequest;

public class TestLakeFormationAwsClientFactory {
Expand Down Expand Up @@ -128,8 +131,18 @@ public void testLakeFormationEnabledGlueCatalog() throws Exception {
+ glueArnPrefix
+ ":userDefinedFunction/allowed_*/*\"]}]}")
.build());
waitForIamConsistency();

Awaitility.await()
.pollDelay(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.until(
() ->
iam.getRolePolicy(
GetRolePolicyRequest.builder()
.roleName(roleName)
.policyName(policyName)
.build())
.roleName()
.equalsIgnoreCase(roleName));
GlueCatalog glueCatalog = new GlueCatalog();
assumeRoleProperties.put("warehouse", "s3://path");
glueCatalog.initialize("test", assumeRoleProperties);
Expand Down Expand Up @@ -162,8 +175,4 @@ public void testLakeFormationEnabledGlueCatalog() throws Exception {
}
}
}

private void waitForIamConsistency() throws Exception {
Thread.sleep(IAM_PROPAGATION_DELAY); // sleep to make sure IAM up to date
}
}
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ project(':iceberg-api') {
compileOnly libs.findbugs.jsr305
testImplementation libs.avro.avro
testImplementation libs.esotericsoftware.kryo
testImplementation libs.awaitility
}

tasks.processTestResources.dependsOn rootProject.tasks.buildInfo
Expand Down Expand Up @@ -492,6 +493,7 @@ project(':iceberg-aws') {
testImplementation libs.mockserver.client.java
testImplementation libs.jaxb.api
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
testImplementation libs.awaitility
}

sourceSets {
Expand Down Expand Up @@ -712,6 +714,7 @@ project(':iceberg-hive-metastore') {
}

testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation libs.awaitility
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand All @@ -54,6 +55,7 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Tasks;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
Expand Down Expand Up @@ -435,13 +437,11 @@ public void testConcurrentFastAppends(@TempDir File dir) throws Exception {
for (int numCommittedFiles = 0;
numCommittedFiles < numberOfCommitedFilesPerThread;
numCommittedFiles++) {
while (barrier.get() < numCommittedFiles * threadsCount) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
final int currentFilesCount = numCommittedFiles;
Awaitility.await()
.pollInterval(Duration.ofMillis(10))
.pollInSameThread()
.until(() -> barrier.get() >= currentFilesCount * threadsCount);
tableWithHighRetries.newFastAppend().appendFile(file).commit();
barrier.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand All @@ -46,6 +47,7 @@
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Tasks;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand Down Expand Up @@ -92,14 +94,11 @@ public synchronized void testConcurrentFastAppends() throws IOException {
.run(
index -> {
for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) {
while (barrier.get() < numCommittedFiles * 2) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

final int currentFilesCount = numCommittedFiles;
Awaitility.await()
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofMillis(1000))
.until(() -> barrier.get() >= currentFilesCount * 2);
icebergTable.newFastAppend().appendFile(file).commit();
barrier.incrementAndGet();
}
Expand Down
Loading

0 comments on commit 9bc33bd

Please sign in to comment.