Skip to content

Commit

Permalink
AWS, Core: Use Awaitility instead of Thread.sleep()
Browse files Browse the repository at this point in the history
  • Loading branch information
nk1506 authored Oct 30, 2023
1 parent 3c62591 commit 7e9e02c
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.aws;

import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.aws.glue.GlueCatalog;
Expand All @@ -27,6 +28,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -42,6 +44,7 @@
import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
import software.amazon.awssdk.services.iam.model.DeleteRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.DeleteRoleRequest;
import software.amazon.awssdk.services.iam.model.GetRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.PutRolePolicyRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;

Expand Down Expand Up @@ -100,7 +103,7 @@ public void after() {
}

@Test
public void testAssumeRoleGlueCatalog() throws Exception {
public void testAssumeRoleGlueCatalog() {
String glueArnPrefix = "arn:aws:glue:*:" + AwsIntegTestUtil.testAccountId();
iam.putRolePolicy(
PutRolePolicyRequest.builder()
Expand Down Expand Up @@ -189,7 +192,19 @@ public void testAssumeRoleS3FileIO() throws Exception {
Assert.assertFalse("should be able to access file", inputFile.exists());
}

private void waitForIamConsistency() throws Exception {
Thread.sleep(10000); // sleep to make sure IAM up to date
private void waitForIamConsistency() {
Awaitility.await("wait for IAM role policy to update.")
.pollDelay(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.ignoreExceptions()
.untilAsserted(
() ->
Assertions.assertThat(
iam.getRolePolicy(
GetRolePolicyRequest.builder()
.roleName(roleName)
.roleName(policyName)
.build()))
.isNotNull());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.aws.glue;

import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand All @@ -38,6 +39,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.util.Tasks;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -132,21 +134,19 @@ public void testParallelCommitMultiThreadMultiCommit() {
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));

AtomicInteger barrier = new AtomicInteger(0);
Tasks.range(2)
int threadsCount = 2;
Tasks.range(threadsCount)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(executorService)
.run(
index -> {
for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) {
while (barrier.get() < numCommittedFiles * 2) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

final int currentFilesCount = numCommittedFiles;
Awaitility.await()
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofSeconds(10))
.until(() -> barrier.get() >= currentFilesCount * threadsCount);
table.newFastAppend().appendFile(file).commit();
barrier.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -37,6 +38,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
Expand Down Expand Up @@ -67,6 +70,7 @@
import software.amazon.awssdk.services.lakeformation.model.DataLocationResource;
import software.amazon.awssdk.services.lakeformation.model.DatabaseResource;
import software.amazon.awssdk.services.lakeformation.model.DeregisterResourceRequest;
import software.amazon.awssdk.services.lakeformation.model.DescribeResourceRequest;
import software.amazon.awssdk.services.lakeformation.model.EntityNotFoundException;
import software.amazon.awssdk.services.lakeformation.model.GetDataLakeSettingsRequest;
import software.amazon.awssdk.services.lakeformation.model.GetDataLakeSettingsResponse;
Expand Down Expand Up @@ -175,7 +179,7 @@ public static void beforeClass() throws Exception {
lfRegisterPathRoleIamPolicyName,
lfRegisterPathRolePolicyDocForIam(lfRegisterPathRoleArn),
lfRegisterPathRoleName);
waitForIamConsistency();
waitForIamConsistency(lfRegisterPathRoleName, lfRegisterPathRoleIamPolicyName);

// create lfPrivilegedRole
response =
Expand Down Expand Up @@ -205,7 +209,7 @@ public static void beforeClass() throws Exception {
lfPrivilegedRolePolicyName,
lfPrivilegedRolePolicyDoc(),
lfPrivilegedRoleName);
waitForIamConsistency();
waitForIamConsistency(lfPrivilegedRoleName, lfPrivilegedRolePolicyName);

// build lf and glue client with lfRegisterPathRole
lakeformation =
Expand Down Expand Up @@ -250,7 +254,6 @@ public static void beforeClass() throws Exception {
// register S3 test bucket path
deregisterResource(testBucketPath);
registerResource(testBucketPath);
waitForIamConsistency();
}

@AfterClass
Expand Down Expand Up @@ -357,8 +360,20 @@ String getRandomTableName() {
return LF_TEST_TABLE_PREFIX + UUID.randomUUID().toString().replace("-", "");
}

private static void waitForIamConsistency() throws Exception {
Thread.sleep(IAM_PROPAGATION_DELAY); // sleep to make sure IAM up to date
private static void waitForIamConsistency(String roleName, String policyName) {
// wait to make sure IAM up to date
Awaitility.await()
.pollDelay(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.untilAsserted(
() ->
Assertions.assertThat(
iam.getRolePolicy(
GetRolePolicyRequest.builder()
.roleName(roleName)
.policyName(policyName)
.build()))
.isNotNull());
}

private static LakeFormationClient buildLakeFormationClient(
Expand Down Expand Up @@ -417,7 +432,19 @@ private static void registerResource(String s3Location) {
.build());
// when a resource is registered, LF will update SLR with necessary permissions which has a
// propagation delay
waitForIamConsistency();
Awaitility.await()
.pollDelay(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.ignoreExceptions()
.untilAsserted(
() ->
Assertions.assertThat(
lakeformation
.describeResource(
DescribeResourceRequest.builder().resourceArn(arn).build())
.resourceInfo()
.roleArn())
.isEqualToIgnoringCase(lfRegisterPathRoleArn));
} catch (AlreadyExistsException e) {
LOG.warn("Resource {} already registered. Error: {}", arn, e.getMessage(), e);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.aws.lakeformation;

import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.aws.AwsIntegTestUtil;
Expand All @@ -26,6 +27,8 @@
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -41,6 +44,7 @@
import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
import software.amazon.awssdk.services.iam.model.DeleteRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.DeleteRoleRequest;
import software.amazon.awssdk.services.iam.model.GetRolePolicyRequest;
import software.amazon.awssdk.services.iam.model.PutRolePolicyRequest;

public class TestLakeFormationAwsClientFactory {
Expand Down Expand Up @@ -128,8 +132,18 @@ public void testLakeFormationEnabledGlueCatalog() throws Exception {
+ glueArnPrefix
+ ":userDefinedFunction/allowed_*/*\"]}]}")
.build());
waitForIamConsistency();

Awaitility.await()
.pollDelay(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(10))
.untilAsserted(
() ->
Assertions.assertThat(
iam.getRolePolicy(
GetRolePolicyRequest.builder()
.roleName(roleName)
.policyName(policyName)
.build()))
.isNotNull());
GlueCatalog glueCatalog = new GlueCatalog();
assumeRoleProperties.put("warehouse", "s3://path");
glueCatalog.initialize("test", assumeRoleProperties);
Expand Down Expand Up @@ -162,8 +176,4 @@ public void testLakeFormationEnabledGlueCatalog() throws Exception {
}
}
}

private void waitForIamConsistency() throws Exception {
Thread.sleep(IAM_PROPAGATION_DELAY); // sleep to make sure IAM up to date
}
}
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ project(':iceberg-api') {
compileOnly libs.findbugs.jsr305
testImplementation libs.avro.avro
testImplementation libs.esotericsoftware.kryo
testImplementation libs.awaitility
}

tasks.processTestResources.dependsOn rootProject.tasks.buildInfo
Expand Down Expand Up @@ -497,6 +498,7 @@ project(':iceberg-aws') {
testImplementation libs.mockserver.client.java
testImplementation libs.jaxb.api
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
testImplementation libs.awaitility
}

sourceSets {
Expand Down Expand Up @@ -717,6 +719,7 @@ project(':iceberg-hive-metastore') {
}

testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation libs.awaitility
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand All @@ -54,6 +55,7 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Tasks;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
Expand Down Expand Up @@ -435,13 +437,11 @@ public void testConcurrentFastAppends(@TempDir File dir) throws Exception {
for (int numCommittedFiles = 0;
numCommittedFiles < numberOfCommitedFilesPerThread;
numCommittedFiles++) {
while (barrier.get() < numCommittedFiles * threadsCount) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
final int currentFilesCount = numCommittedFiles;
Awaitility.await()
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofSeconds(10))
.until(() -> barrier.get() >= currentFilesCount * threadsCount);
tableWithHighRetries.newFastAppend().appendFile(file).commit();
barrier.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand All @@ -46,6 +47,7 @@
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Tasks;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand Down Expand Up @@ -85,21 +87,19 @@ public synchronized void testConcurrentFastAppends() throws IOException {
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));

AtomicInteger barrier = new AtomicInteger(0);
Tasks.range(2)
int threadsCount = 2;
Tasks.range(threadsCount)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(executorService)
.run(
index -> {
for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) {
while (barrier.get() < numCommittedFiles * 2) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

final int currentFilesCount = numCommittedFiles;
Awaitility.await()
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofSeconds(10))
.until(() -> barrier.get() >= currentFilesCount * threadsCount);
icebergTable.newFastAppend().appendFile(file).commit();
barrier.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -35,6 +36,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.util.Tasks;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

public class TestHiveTableConcurrency extends HiveTableBaseTest {
Expand All @@ -56,21 +58,19 @@ public synchronized void testConcurrentFastAppends() {
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));

AtomicInteger barrier = new AtomicInteger(0);
Tasks.range(2)
int threadsCount = 2;
Tasks.range(threadsCount)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(executorService)
.run(
index -> {
for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) {
while (barrier.get() < numCommittedFiles * 2) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

final int currentFilesCount = numCommittedFiles;
Awaitility.await()
.pollInterval(Duration.ofMillis(10))
.atMost(Duration.ofSeconds(10))
.until(() -> barrier.get() >= currentFilesCount * threadsCount);
icebergTable.newFastAppend().appendFile(file).commit();
barrier.incrementAndGet();
}
Expand Down
Loading

0 comments on commit 7e9e02c

Please sign in to comment.