Skip to content

Commit

Permalink
Build: Replace Thread.Sleep() usage with org.Awaitility from Tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
nk1506 committed Oct 28, 2023
1 parent b5ea0d5 commit 37258a8
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.aws;

import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.aws.glue.GlueCatalog;
Expand All @@ -27,6 +28,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -42,6 +44,7 @@
import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
import software.amazon.awssdk.services.iam.model.DeleteRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.DeleteRoleRequest;
import software.amazon.awssdk.services.iam.model.GetRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.PutRolePolicyRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;

Expand Down Expand Up @@ -100,7 +103,7 @@ public void after() {
}

@Test
public void testAssumeRoleGlueCatalog() throws Exception {
public void testAssumeRoleGlueCatalog() {
String glueArnPrefix = "arn:aws:glue:*:" + AwsIntegTestUtil.testAccountId();
iam.putRolePolicy(
PutRolePolicyRequest.builder()
Expand Down Expand Up @@ -189,7 +192,19 @@ public void testAssumeRoleS3FileIO() throws Exception {
Assert.assertFalse("should be able to access file", inputFile.exists());
}

private void waitForIamConsistency() throws Exception {
Thread.sleep(10000); // sleep to make sure IAM up to date
private void waitForIamConsistency() {
Awaitility.await("wait for IAM role policy to update.")
.pollDelay(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.ignoreExceptions()
.untilAsserted(
() ->
Assertions.assertThat(
iam.getRolePolicy(
GetRolePolicyRequest.builder()
.roleName(roleName)
.roleName(policyName)
.build()))
.isNotNull());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.aws.glue;

import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand All @@ -38,6 +39,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.util.Tasks;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -132,21 +134,20 @@ public void testParallelCommitMultiThreadMultiCommit() {
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));

AtomicInteger barrier = new AtomicInteger(0);
Tasks.range(2)
int threadsCount = 2;
Tasks.range(threadsCount)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(executorService)
.run(
index -> {
for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) {
while (barrier.get() < numCommittedFiles * 2) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

final int currentFilesCount = numCommittedFiles;
Awaitility.await()
.pollInSameThread()
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofSeconds(300))
.until(() -> barrier.get() >= currentFilesCount * threadsCount);
table.newFastAppend().appendFile(file).commit();
barrier.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -37,6 +38,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
Expand Down Expand Up @@ -67,6 +70,7 @@
import software.amazon.awssdk.services.lakeformation.model.DataLocationResource;
import software.amazon.awssdk.services.lakeformation.model.DatabaseResource;
import software.amazon.awssdk.services.lakeformation.model.DeregisterResourceRequest;
import software.amazon.awssdk.services.lakeformation.model.DescribeResourceRequest;
import software.amazon.awssdk.services.lakeformation.model.EntityNotFoundException;
import software.amazon.awssdk.services.lakeformation.model.GetDataLakeSettingsRequest;
import software.amazon.awssdk.services.lakeformation.model.GetDataLakeSettingsResponse;
Expand Down Expand Up @@ -175,7 +179,7 @@ public static void beforeClass() throws Exception {
lfRegisterPathRoleIamPolicyName,
lfRegisterPathRolePolicyDocForIam(lfRegisterPathRoleArn),
lfRegisterPathRoleName);
waitForIamConsistency();
waitForIamConsistency(lfRegisterPathRoleName, lfRegisterPathRoleIamPolicyName);

// create lfPrivilegedRole
response =
Expand Down Expand Up @@ -205,7 +209,7 @@ public static void beforeClass() throws Exception {
lfPrivilegedRolePolicyName,
lfPrivilegedRolePolicyDoc(),
lfPrivilegedRoleName);
waitForIamConsistency();
waitForIamConsistency(lfPrivilegedRoleName, lfPrivilegedRolePolicyName);

// build lf and glue client with lfRegisterPathRole
lakeformation =
Expand Down Expand Up @@ -250,7 +254,6 @@ public static void beforeClass() throws Exception {
// register S3 test bucket path
deregisterResource(testBucketPath);
registerResource(testBucketPath);
waitForIamConsistency();
}

@AfterClass
Expand Down Expand Up @@ -357,8 +360,20 @@ String getRandomTableName() {
return LF_TEST_TABLE_PREFIX + UUID.randomUUID().toString().replace("-", "");
}

private static void waitForIamConsistency() throws Exception {
Thread.sleep(IAM_PROPAGATION_DELAY); // sleep to make sure IAM up to date
private static void waitForIamConsistency(String roleName, String policyName) {
// wait to make sure IAM up to date
Awaitility.await()
.pollDelay(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.untilAsserted(
() ->
Assertions.assertThat(
iam.getRolePolicy(
GetRolePolicyRequest.builder()
.roleName(roleName)
.policyName(policyName)
.build()))
.isNotNull());
}

private static LakeFormationClient buildLakeFormationClient(
Expand Down Expand Up @@ -417,7 +432,19 @@ private static void registerResource(String s3Location) {
.build());
// when a resource is registered, LF will update SLR with necessary permissions which has a
// propagation delay
waitForIamConsistency();
Awaitility.await()
.pollDelay(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.ignoreExceptions()
.untilAsserted(
() ->
Assertions.assertThat(
lakeformation
.describeResource(
DescribeResourceRequest.builder().resourceArn(arn).build())
.resourceInfo()
.roleArn())
.isEqualToIgnoringCase(lfRegisterPathRoleArn));
} catch (AlreadyExistsException e) {
LOG.warn("Resource {} already registered. Error: {}", arn, e.getMessage(), e);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.aws.lakeformation;

import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.aws.AwsIntegTestUtil;
Expand All @@ -26,6 +27,8 @@
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -41,6 +44,7 @@
import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
import software.amazon.awssdk.services.iam.model.DeleteRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.DeleteRoleRequest;
import software.amazon.awssdk.services.iam.model.GetRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.PutRolePolicyRequest;

public class TestLakeFormationAwsClientFactory {
Expand Down Expand Up @@ -128,8 +132,18 @@ public void testLakeFormationEnabledGlueCatalog() throws Exception {
+ glueArnPrefix
+ ":userDefinedFunction/allowed_*/*\"]}]}")
.build());
waitForIamConsistency();

Awaitility.await()
.pollDelay(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.untilAsserted(
() ->
Assertions.assertThat(
iam.getRolePolicy(
GetRolePolicyRequest.builder()
.roleName(roleName)
.policyName(policyName)
.build()))
.isNotNull());
GlueCatalog glueCatalog = new GlueCatalog();
assumeRoleProperties.put("warehouse", "s3://path");
glueCatalog.initialize("test", assumeRoleProperties);
Expand Down Expand Up @@ -162,8 +176,4 @@ public void testLakeFormationEnabledGlueCatalog() throws Exception {
}
}
}

private void waitForIamConsistency() throws Exception {
Thread.sleep(IAM_PROPAGATION_DELAY); // sleep to make sure IAM up to date
}
}
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ project(':iceberg-api') {
compileOnly libs.findbugs.jsr305
testImplementation libs.avro.avro
testImplementation libs.esotericsoftware.kryo
testImplementation libs.awaitility
}

tasks.processTestResources.dependsOn rootProject.tasks.buildInfo
Expand Down Expand Up @@ -492,6 +493,7 @@ project(':iceberg-aws') {
testImplementation libs.mockserver.client.java
testImplementation libs.jaxb.api
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
testImplementation libs.awaitility
}

sourceSets {
Expand Down Expand Up @@ -712,6 +714,7 @@ project(':iceberg-hive-metastore') {
}

testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation libs.awaitility
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand All @@ -54,6 +55,7 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Tasks;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
Expand Down Expand Up @@ -435,13 +437,12 @@ public void testConcurrentFastAppends(@TempDir File dir) throws Exception {
for (int numCommittedFiles = 0;
numCommittedFiles < numberOfCommitedFilesPerThread;
numCommittedFiles++) {
while (barrier.get() < numCommittedFiles * threadsCount) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
final int currentFilesCount = numCommittedFiles;
Awaitility.await()
.pollInSameThread()
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofSeconds(300))
.until(() -> barrier.get() >= currentFilesCount * threadsCount);
tableWithHighRetries.newFastAppend().appendFile(file).commit();
barrier.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand All @@ -46,6 +47,7 @@
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Tasks;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand Down Expand Up @@ -85,21 +87,20 @@ public synchronized void testConcurrentFastAppends() throws IOException {
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));

AtomicInteger barrier = new AtomicInteger(0);
Tasks.range(2)
int threadsCount = 2;
Tasks.range(threadsCount)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(executorService)
.run(
index -> {
for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) {
while (barrier.get() < numCommittedFiles * 2) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

final int currentFilesCount = numCommittedFiles;
Awaitility.await()
.pollInSameThread()
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofSeconds(300))
.until(() -> barrier.get() >= currentFilesCount * threadsCount);
icebergTable.newFastAppend().appendFile(file).commit();
barrier.incrementAndGet();
}
Expand Down
Loading

0 comments on commit 37258a8

Please sign in to comment.