From 7e9e02c0e3f1304be89d094b0eab238da46c34ab Mon Sep 17 00:00:00 2001 From: Naveen Kumar Date: Mon, 30 Oct 2023 17:03:11 +0530 Subject: [PATCH] AWS, Core: Use Awaitility instead of Thread.sleep() --- .../aws/TestAssumeRoleAwsClientFactory.java | 21 ++++++++-- .../iceberg/aws/glue/TestGlueCatalogLock.java | 18 ++++----- .../lakeformation/LakeFormationTestBase.java | 39 ++++++++++++++++--- .../TestLakeFormationAwsClientFactory.java | 22 ++++++++--- build.gradle | 3 ++ .../iceberg/hadoop/TestHadoopCommits.java | 14 +++---- .../jdbc/TestJdbcTableConcurrency.java | 18 ++++----- .../hive/TestHiveTableConcurrency.java | 18 ++++----- .../spark/source/SparkSQLExecutionHelper.java | 2 +- .../spark/source/SparkSQLExecutionHelper.java | 2 +- .../spark/source/SparkSQLExecutionHelper.java | 2 +- .../spark/source/SparkSQLExecutionHelper.java | 2 +- 12 files changed, 108 insertions(+), 53 deletions(-) diff --git a/aws/src/integration/java/org/apache/iceberg/aws/TestAssumeRoleAwsClientFactory.java b/aws/src/integration/java/org/apache/iceberg/aws/TestAssumeRoleAwsClientFactory.java index 4f2b3b8a1f79..99687777f67b 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/TestAssumeRoleAwsClientFactory.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/TestAssumeRoleAwsClientFactory.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.aws; +import java.time.Duration; import java.util.Map; import java.util.UUID; import org.apache.iceberg.aws.glue.GlueCatalog; @@ -27,6 +28,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.assertj.core.api.Assertions; import org.assertj.core.api.InstanceOfAssertFactories; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -42,6 +44,7 @@ import software.amazon.awssdk.services.iam.model.CreateRoleResponse; import software.amazon.awssdk.services.iam.model.DeleteRolePolicyRequest; import software.amazon.awssdk.services.iam.model.DeleteRoleRequest; +import software.amazon.awssdk.services.iam.model.GetRolePolicyRequest; import software.amazon.awssdk.services.iam.model.PutRolePolicyRequest; import software.amazon.awssdk.services.s3.model.S3Exception; @@ -100,7 +103,7 @@ public void after() { } @Test - public void testAssumeRoleGlueCatalog() throws Exception { + public void testAssumeRoleGlueCatalog() { String glueArnPrefix = "arn:aws:glue:*:" + AwsIntegTestUtil.testAccountId(); iam.putRolePolicy( PutRolePolicyRequest.builder() @@ -189,7 +192,19 @@ public void testAssumeRoleS3FileIO() throws Exception { Assert.assertFalse("should be able to access file", inputFile.exists()); } - private void waitForIamConsistency() throws Exception { - Thread.sleep(10000); // sleep to make sure IAM up to date + private void waitForIamConsistency() { + Awaitility.await("wait for IAM role policy to update.") + .pollDelay(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(10)) + .ignoreExceptions() + .untilAsserted( + () -> + Assertions.assertThat( + iam.getRolePolicy( + GetRolePolicyRequest.builder() + .roleName(roleName) + .roleName(policyName) + .build())) + .isNotNull()); } } diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogLock.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogLock.java index 4c48a277192d..825f2a330533 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogLock.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogLock.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.aws.glue; +import java.time.Duration; import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -38,6 +39,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.util.Tasks; +import org.awaitility.Awaitility; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -132,21 +134,19 @@ public void testParallelCommitMultiThreadMultiCommit() { (ThreadPoolExecutor) Executors.newFixedThreadPool(2)); AtomicInteger barrier = new AtomicInteger(0); - Tasks.range(2) + int threadsCount = 2; + Tasks.range(threadsCount) .stopOnFailure() .throwFailureWhenFinished() .executeWith(executorService) .run( index -> { for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) { - while (barrier.get() < numCommittedFiles * 2) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - + final int currentFilesCount = numCommittedFiles; + Awaitility.await() + .pollInterval(Duration.ofMillis(10)) + .atMost(Duration.ofSeconds(10)) + .until(() -> barrier.get() >= currentFilesCount * threadsCount); table.newFastAppend().appendFile(file).commit(); barrier.incrementAndGet(); } diff --git a/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/LakeFormationTestBase.java b/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/LakeFormationTestBase.java index 431ee91bd22f..67586943c4d7 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/LakeFormationTestBase.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/LakeFormationTestBase.java @@ -21,6 +21,7 @@ import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Objects; @@ -37,6 +38,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; import org.junit.AfterClass; import org.junit.BeforeClass; import org.slf4j.Logger; @@ -67,6 +70,7 @@ import software.amazon.awssdk.services.lakeformation.model.DataLocationResource; import software.amazon.awssdk.services.lakeformation.model.DatabaseResource; import software.amazon.awssdk.services.lakeformation.model.DeregisterResourceRequest; +import software.amazon.awssdk.services.lakeformation.model.DescribeResourceRequest; import software.amazon.awssdk.services.lakeformation.model.EntityNotFoundException; import software.amazon.awssdk.services.lakeformation.model.GetDataLakeSettingsRequest; import software.amazon.awssdk.services.lakeformation.model.GetDataLakeSettingsResponse; @@ -175,7 +179,7 @@ public static void beforeClass() throws Exception { lfRegisterPathRoleIamPolicyName, lfRegisterPathRolePolicyDocForIam(lfRegisterPathRoleArn), lfRegisterPathRoleName); - waitForIamConsistency(); + waitForIamConsistency(lfRegisterPathRoleName, lfRegisterPathRoleIamPolicyName); // create lfPrivilegedRole response = @@ -205,7 +209,7 @@ public static void beforeClass() throws Exception { lfPrivilegedRolePolicyName, lfPrivilegedRolePolicyDoc(), lfPrivilegedRoleName); - waitForIamConsistency(); + waitForIamConsistency(lfPrivilegedRoleName, lfPrivilegedRolePolicyName); // build lf and glue client with lfRegisterPathRole lakeformation = @@ -250,7 +254,6 @@ public static void beforeClass() throws Exception { // register S3 test bucket path deregisterResource(testBucketPath); registerResource(testBucketPath); - waitForIamConsistency(); } @AfterClass @@ -357,8 +360,20 @@ String getRandomTableName() { return LF_TEST_TABLE_PREFIX + UUID.randomUUID().toString().replace("-", ""); } - private static void waitForIamConsistency() throws Exception { - Thread.sleep(IAM_PROPAGATION_DELAY); // sleep to make sure IAM up to date + private static void waitForIamConsistency(String roleName, String policyName) { + // wait to make sure IAM up to date + Awaitility.await() + .pollDelay(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(10)) + .untilAsserted( + () -> + Assertions.assertThat( + iam.getRolePolicy( + GetRolePolicyRequest.builder() + .roleName(roleName) + .policyName(policyName) + .build())) + .isNotNull()); } private static LakeFormationClient buildLakeFormationClient( @@ -417,7 +432,19 @@ private static void registerResource(String s3Location) { .build()); // when a resource is registered, LF will update SLR with necessary permissions which has a // propagation delay - waitForIamConsistency(); + Awaitility.await() + .pollDelay(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(10)) + .ignoreExceptions() + .untilAsserted( + () -> + Assertions.assertThat( + lakeformation + .describeResource( + DescribeResourceRequest.builder().resourceArn(arn).build()) + .resourceInfo() + .roleArn()) + .isEqualToIgnoringCase(lfRegisterPathRoleArn)); } catch (AlreadyExistsException e) { LOG.warn("Resource {} already registered. Error: {}", arn, e.getMessage(), e); } catch (Exception e) { diff --git a/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/TestLakeFormationAwsClientFactory.java b/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/TestLakeFormationAwsClientFactory.java index 9c59ba1aa19e..f8d88901b9f8 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/TestLakeFormationAwsClientFactory.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/lakeformation/TestLakeFormationAwsClientFactory.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.aws.lakeformation; +import java.time.Duration; import java.util.Map; import java.util.UUID; import org.apache.iceberg.aws.AwsIntegTestUtil; @@ -26,6 +27,8 @@ import org.apache.iceberg.aws.glue.GlueCatalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -41,6 +44,7 @@ import software.amazon.awssdk.services.iam.model.CreateRoleResponse; import software.amazon.awssdk.services.iam.model.DeleteRolePolicyRequest; import software.amazon.awssdk.services.iam.model.DeleteRoleRequest; +import software.amazon.awssdk.services.iam.model.GetRolePolicyRequest; import software.amazon.awssdk.services.iam.model.PutRolePolicyRequest; public class TestLakeFormationAwsClientFactory { @@ -128,8 +132,18 @@ public void testLakeFormationEnabledGlueCatalog() throws Exception { + glueArnPrefix + ":userDefinedFunction/allowed_*/*\"]}]}") .build()); - waitForIamConsistency(); - + Awaitility.await() + .pollDelay(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(10)) + .untilAsserted( + () -> + Assertions.assertThat( + iam.getRolePolicy( + GetRolePolicyRequest.builder() + .roleName(roleName) + .policyName(policyName) + .build())) + .isNotNull()); GlueCatalog glueCatalog = new GlueCatalog(); assumeRoleProperties.put("warehouse", "s3://path"); glueCatalog.initialize("test", assumeRoleProperties); @@ -162,8 +176,4 @@ public void testLakeFormationEnabledGlueCatalog() throws Exception { } } } - - private void waitForIamConsistency() throws Exception { - Thread.sleep(IAM_PROPAGATION_DELAY); // sleep to make sure IAM up to date - } } diff --git a/build.gradle b/build.gradle index 689655f332be..679ffc6f2fc8 100644 --- a/build.gradle +++ b/build.gradle @@ -298,6 +298,7 @@ project(':iceberg-api') { compileOnly libs.findbugs.jsr305 testImplementation libs.avro.avro testImplementation libs.esotericsoftware.kryo + testImplementation libs.awaitility } tasks.processTestResources.dependsOn rootProject.tasks.buildInfo @@ -497,6 +498,7 @@ project(':iceberg-aws') { testImplementation libs.mockserver.client.java testImplementation libs.jaxb.api testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') + testImplementation libs.awaitility } sourceSets { @@ -717,6 +719,7 @@ project(':iceberg-hive-metastore') { } testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') + testImplementation libs.awaitility } } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java index b9c23f33a583..e02b9deaee85 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.List; import java.util.Set; import java.util.UUID; @@ -54,6 +55,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Tasks; import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.mockito.Mockito; @@ -435,13 +437,11 @@ public void testConcurrentFastAppends(@TempDir File dir) throws Exception { for (int numCommittedFiles = 0; numCommittedFiles < numberOfCommitedFilesPerThread; numCommittedFiles++) { - while (barrier.get() < numCommittedFiles * threadsCount) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } + final int currentFilesCount = numCommittedFiles; + Awaitility.await() + .pollInterval(Duration.ofMillis(10)) + .atMost(Duration.ofSeconds(10)) + .until(() -> barrier.get() >= currentFilesCount * threadsCount); tableWithHighRetries.newFastAppend().appendFile(file).commit(); barrier.incrementAndGet(); } diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcTableConcurrency.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcTableConcurrency.java index b4c5677dff80..16321c1fa855 100644 --- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcTableConcurrency.java +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcTableConcurrency.java @@ -26,6 +26,7 @@ import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -46,6 +47,7 @@ import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Tasks; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -85,21 +87,19 @@ public synchronized void testConcurrentFastAppends() throws IOException { (ThreadPoolExecutor) Executors.newFixedThreadPool(2)); AtomicInteger barrier = new AtomicInteger(0); - Tasks.range(2) + int threadsCount = 2; + Tasks.range(threadsCount) .stopOnFailure() .throwFailureWhenFinished() .executeWith(executorService) .run( index -> { for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) { - while (barrier.get() < numCommittedFiles * 2) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - + final int currentFilesCount = numCommittedFiles; + Awaitility.await() + .pollInterval(Duration.ofMillis(10)) + .atMost(Duration.ofSeconds(10)) + .until(() -> barrier.get() >= currentFilesCount * threadsCount); icebergTable.newFastAppend().appendFile(file).commit(); barrier.incrementAndGet(); } diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java index e7608962cb85..8cc2575b1bbe 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java @@ -23,6 +23,7 @@ import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; import static org.assertj.core.api.Assertions.assertThat; +import java.time.Duration; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -35,6 +36,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.util.Tasks; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; public class TestHiveTableConcurrency extends HiveTableBaseTest { @@ -56,21 +58,19 @@ public synchronized void testConcurrentFastAppends() { (ThreadPoolExecutor) Executors.newFixedThreadPool(2)); AtomicInteger barrier = new AtomicInteger(0); - Tasks.range(2) + int threadsCount = 2; + Tasks.range(threadsCount) .stopOnFailure() .throwFailureWhenFinished() .executeWith(executorService) .run( index -> { for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) { - while (barrier.get() < numCommittedFiles * 2) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - + final int currentFilesCount = numCommittedFiles; + Awaitility.await() + .pollInterval(Duration.ofMillis(10)) + .atMost(Duration.ofSeconds(10)) + .until(() -> barrier.get() >= currentFilesCount * threadsCount); icebergTable.newFastAppend().appendFile(file).commit(); barrier.incrementAndGet(); } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java index cdbc198b6369..cdc380b1b6be 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java @@ -53,7 +53,7 @@ public static String lastExecutedMetricValue(SparkSession spark, String metricNa // Refresh metricValues, they will remain null until the execution is complete and metrics are // aggregated Awaitility.await() - .atMost(Duration.ofMillis(500)) + .atMost(Duration.ofSeconds(3)) .pollInterval(Duration.ofMillis(100)) .untilAsserted( () -> assertThat(statusStore.execution(lastExecution.executionId()).get()).isNotNull()); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java index cdbc198b6369..cdc380b1b6be 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java @@ -53,7 +53,7 @@ public static String lastExecutedMetricValue(SparkSession spark, String metricNa // Refresh metricValues, they will remain null until the execution is complete and metrics are // aggregated Awaitility.await() - .atMost(Duration.ofMillis(500)) + .atMost(Duration.ofSeconds(3)) .pollInterval(Duration.ofMillis(100)) .untilAsserted( () -> assertThat(statusStore.execution(lastExecution.executionId()).get()).isNotNull()); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java index cdbc198b6369..cdc380b1b6be 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java @@ -53,7 +53,7 @@ public static String lastExecutedMetricValue(SparkSession spark, String metricNa // Refresh metricValues, they will remain null until the execution is complete and metrics are // aggregated Awaitility.await() - .atMost(Duration.ofMillis(500)) + .atMost(Duration.ofSeconds(3)) .pollInterval(Duration.ofMillis(100)) .untilAsserted( () -> assertThat(statusStore.execution(lastExecution.executionId()).get()).isNotNull()); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java index cdbc198b6369..cdc380b1b6be 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java @@ -53,7 +53,7 @@ public static String lastExecutedMetricValue(SparkSession spark, String metricNa // Refresh metricValues, they will remain null until the execution is complete and metrics are // aggregated Awaitility.await() - .atMost(Duration.ofMillis(500)) + .atMost(Duration.ofSeconds(3)) .pollInterval(Duration.ofMillis(100)) .untilAsserted( () -> assertThat(statusStore.execution(lastExecution.executionId()).get()).isNotNull());